AsyncMemoryGraph

Struct AsyncMemoryGraph 

Source
pub struct AsyncMemoryGraph { /* private fields */ }
Expand description

Async interface for interacting with the memory graph

AsyncMemoryGraph provides a fully async, thread-safe API for managing conversation sessions, prompts, responses, agents, templates, and their relationships in a graph structure.

All operations are non-blocking and can be executed concurrently without performance degradation.

§Examples

use llm_memory_graph::engine::AsyncMemoryGraph;
use llm_memory_graph::Config;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = Config::new("./data/my_graph.db");
    let graph = AsyncMemoryGraph::open(config).await?;

    let session = graph.create_session().await?;
    let prompt_id = graph.add_prompt(session.id, "What is Rust?".to_string(), None).await?;
    Ok(())
}

Implementations§

Source§

impl AsyncMemoryGraph

Source

pub async fn open(config: Config) -> Result<Self>

Open or create an async memory graph with the given configuration

This will create the database directory if it doesn’t exist and initialize all necessary storage trees. Operations use Tokio’s async runtime.

§Errors

Returns an error if:

  • The database path is invalid or inaccessible
  • Storage initialization fails
  • Existing data is corrupted
§Examples
use llm_memory_graph::engine::AsyncMemoryGraph;
use llm_memory_graph::Config;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = Config::new("./data/graph.db");
    let graph = AsyncMemoryGraph::open(config).await?;
    Ok(())
}
Source

pub async fn with_observatory( config: Config, publisher: Option<Arc<dyn EventPublisher>>, obs_config: ObservatoryConfig, ) -> Result<Self>

Open graph with Observatory integration

§Examples
use llm_memory_graph::engine::AsyncMemoryGraph;
use llm_memory_graph::observatory::{ObservatoryConfig, InMemoryPublisher};
use llm_memory_graph::Config;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = Config::default();
    let publisher = Arc::new(InMemoryPublisher::new());
    let obs_config = ObservatoryConfig::new().enabled();

    let graph = AsyncMemoryGraph::with_observatory(
        config,
        Some(publisher),
        obs_config
    ).await?;
    Ok(())
}
Source

pub fn get_metrics(&self) -> Option<MetricsSnapshot>

Get metrics snapshot

Source

pub async fn create_session(&self) -> Result<ConversationSession>

Create a new conversation session asynchronously

Sessions are used to group related prompts and responses together. Each session has a unique ID and can store custom metadata.

§Errors

Returns an error if the session cannot be persisted to storage.

§Examples
let session = graph.create_session().await?;
println!("Created session: {}", session.id);
Source

pub async fn create_session_with_metadata( &self, metadata: HashMap<String, String>, ) -> Result<ConversationSession>

Create a session with custom metadata asynchronously

§Examples
let mut metadata = HashMap::new();
metadata.insert("user_id".to_string(), "123".to_string());
let session = graph.create_session_with_metadata(metadata).await?;
Source

pub async fn get_session( &self, session_id: SessionId, ) -> Result<ConversationSession>

Get a session by ID asynchronously

This will first check the in-memory cache, then fall back to storage.

§Errors

Returns an error if the session doesn’t exist or storage retrieval fails.

Source

pub async fn add_prompt( &self, session_id: SessionId, content: String, metadata: Option<PromptMetadata>, ) -> Result<NodeId>

Add a prompt node to a session asynchronously

§Examples
let prompt_id = graph.add_prompt(
    session.id,
    "Explain async/await in Rust".to_string(),
    None
).await?;
Source

pub async fn add_prompts_batch( &self, prompts: Vec<(SessionId, String)>, ) -> Result<Vec<NodeId>>

Add multiple prompts concurrently (batch operation)

This method processes all prompts in parallel for maximum throughput.

§Examples
let prompts = vec![
    (session.id, "First prompt".to_string()),
    (session.id, "Second prompt".to_string()),
];
let ids = graph.add_prompts_batch(prompts).await?;
Source

pub async fn add_response( &self, prompt_id: NodeId, content: String, token_usage: TokenUsage, metadata: Option<ResponseMetadata>, ) -> Result<NodeId>

Add a response node linked to a prompt asynchronously

§Examples
let usage = TokenUsage::new(10, 50);
let response_id = graph.add_response(
    prompt_id,
    "Async operations are non-blocking!".to_string(),
    usage,
    None
).await?;
Source

pub async fn add_agent(&self, agent: AgentNode) -> Result<AgentId>

Add an agent node asynchronously

§Examples
let agent = AgentNode::new(
    "CodeReviewer".to_string(),
    "code-review".to_string(),
    vec!["rust".to_string(), "python".to_string()]
);
let agent_id = graph.add_agent(agent).await?;
Source

pub async fn update_agent(&self, agent: AgentNode) -> Result<()>

Update an existing agent asynchronously

This invalidates the cache entry for the agent to ensure consistency.

Source

pub async fn assign_agent_to_prompt( &self, prompt_id: NodeId, agent_node_id: NodeId, ) -> Result<()>

Assign an agent to handle a prompt asynchronously

Creates a HandledBy edge from the prompt to the agent.

Source

pub async fn transfer_to_agent( &self, from_response: NodeId, to_agent_node_id: NodeId, ) -> Result<()>

Transfer from one agent to another asynchronously

Creates a TransfersTo edge representing agent handoff.

Source

pub async fn create_template( &self, template: PromptTemplate, ) -> Result<TemplateId>

Create a new prompt template asynchronously

§Examples
let template = PromptTemplate::new(
    "Greeting".to_string(),
    "Hello {{name}}!".to_string(),
    vec![]
);
let template_id = graph.create_template(template).await?;
Source

pub async fn update_template(&self, template: PromptTemplate) -> Result<()>

Update an existing template asynchronously

This invalidates the cache entry for the template to ensure consistency.

Source

pub async fn get_template( &self, template_id: TemplateId, ) -> Result<PromptTemplate>

Get a template by its template ID asynchronously

Source

pub async fn get_template_by_node_id( &self, node_id: NodeId, ) -> Result<PromptTemplate>

Get a template by its node ID asynchronously

Source

pub async fn create_template_from_parent( &self, template: PromptTemplate, parent_node_id: NodeId, ) -> Result<TemplateId>

Create template from parent (inheritance) asynchronously

Link a prompt to the template it was instantiated from

Source

pub async fn add_tool_invocation(&self, tool: ToolInvocation) -> Result<NodeId>

Add a tool invocation node asynchronously

§Examples
let tool = ToolInvocation::new(
    response_id,
    "calculator".to_string(),
    serde_json::json!({"operation": "add", "a": 2, "b": 3})
);
let tool_id = graph.add_tool_invocation(tool).await?;
Source

pub async fn update_tool_invocation(&self, tool: ToolInvocation) -> Result<()>

Update tool invocation with results asynchronously

This invalidates the cache entry for the tool to ensure consistency.

Source

pub async fn get_node(&self, id: &NodeId) -> Result<Option<Node>>

Get a node by ID asynchronously (cache-aware)

This method first checks the cache for the node. If found in cache, it returns immediately (< 1ms latency). Otherwise, it loads from storage and populates the cache for future requests.

§Performance
  • Cache hit: < 1ms latency
  • Cache miss: 2-10ms latency (loads from storage)
§Examples
let node = graph.get_node(&node_id).await?;
Source

pub async fn get_edge(&self, id: &EdgeId) -> Result<Option<Edge>>

Get an edge by ID asynchronously (cache-aware)

This method first checks the cache for the edge. If found in cache, it returns immediately. Otherwise, it loads from storage and populates the cache for future requests.

§Performance
  • Cache hit: < 1ms latency
  • Cache miss: 2-10ms latency (loads from storage)
Source

pub async fn add_edge( &self, from: NodeId, to: NodeId, edge_type: EdgeType, ) -> Result<()>

Add a custom edge asynchronously

Source

pub async fn get_outgoing_edges(&self, node_id: &NodeId) -> Result<Vec<Edge>>

Get all outgoing edges from a node asynchronously

Source

pub async fn get_incoming_edges(&self, node_id: &NodeId) -> Result<Vec<Edge>>

Get all incoming edges to a node asynchronously

Source

pub async fn get_session_nodes( &self, session_id: &SessionId, ) -> Result<Vec<Node>>

Get all nodes in a session asynchronously

Source

pub async fn store_nodes_batch(&self, nodes: Vec<Node>) -> Result<Vec<NodeId>>

Store multiple nodes concurrently asynchronously

This method leverages async concurrency to store multiple nodes in parallel.

Source

pub async fn store_edges_batch(&self, edges: Vec<Edge>) -> Result<()>

Store multiple edges concurrently asynchronously

Source

pub async fn add_responses_batch( &self, responses: Vec<(NodeId, String, TokenUsage)>, ) -> Result<Vec<NodeId>>

Add multiple responses concurrently (batch operation)

This method processes all responses in parallel for maximum throughput. Each response is linked to its corresponding prompt.

§Examples
let responses = vec![
    (prompt1, "Answer 1".to_string(), TokenUsage::new(10, 50)),
    (prompt2, "Answer 2".to_string(), TokenUsage::new(15, 60)),
];
let ids = graph.add_responses_batch(responses).await?;
Source

pub async fn create_sessions_batch( &self, count: usize, ) -> Result<Vec<ConversationSession>>

Create multiple sessions concurrently (batch operation)

This method creates multiple sessions in parallel for maximum throughput.

§Examples
let sessions = graph.create_sessions_batch(5).await?;
assert_eq!(sessions.len(), 5);
Source

pub async fn get_nodes_batch( &self, ids: Vec<NodeId>, ) -> Result<Vec<Option<Node>>>

Retrieve multiple nodes concurrently (batch operation)

This method fetches all nodes in parallel for maximum throughput. Returns nodes in the same order as the input IDs. Missing nodes are None.

§Examples
let ids = vec![id1, id2];
let nodes = graph.get_nodes_batch(ids).await?;
assert_eq!(nodes.len(), 2);
Source

pub async fn delete_nodes_batch(&self, ids: Vec<NodeId>) -> Result<()>

Delete multiple nodes concurrently (batch operation)

This method deletes all nodes in parallel for maximum throughput. Note: This does not cascade delete related edges - you may want to delete related edges separately.

§Examples
let ids = vec![id1, id2];
graph.delete_nodes_batch(ids).await?;
Source

pub async fn add_conversations_batch( &self, conversations: Vec<((SessionId, String), Option<(String, TokenUsage)>)>, ) -> Result<Vec<(NodeId, Option<NodeId>)>>

Process a mixed batch of prompts and responses concurrently

This is an advanced operation that allows you to add prompts and their responses in a single concurrent batch operation. This is useful for bulk importing conversation data.

§Examples
let conversations = vec![
    (
        (session.id, "What is Rust?".to_string()),
        Some(("Rust is a systems programming language".to_string(), TokenUsage::new(5, 30))),
    ),
    (
        (session.id, "How does async work?".to_string()),
        Some(("Async in Rust is zero-cost".to_string(), TokenUsage::new(6, 25))),
    ),
];
let results = graph.add_conversations_batch(conversations).await?;
Source

pub async fn flush(&self) -> Result<()>

Flush any pending writes asynchronously

Source

pub async fn stats(&self) -> Result<StorageStats>

Get storage statistics asynchronously

Source

pub fn query(&self) -> AsyncQueryBuilder

Create a new async query builder for querying the graph

Returns an AsyncQueryBuilder that provides a fluent API for building and executing queries with filtering, pagination, and streaming support.

§Examples
use llm_memory_graph::engine::AsyncMemoryGraph;
use llm_memory_graph::types::NodeType;
use llm_memory_graph::Config;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let graph = AsyncMemoryGraph::open(Config::default()).await?;
    let session = graph.create_session().await?;

    // Query with fluent API
    let prompts = graph.query()
        .session(session.id)
        .node_type(NodeType::Prompt)
        .limit(10)
        .execute()
        .await?;

    println!("Found {} prompts", prompts.len());
    Ok(())
}

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more