pub struct AsyncMemoryGraph { /* private fields */ }Expand description
Async interface for interacting with the memory graph
AsyncMemoryGraph provides a fully async, thread-safe API for managing conversation
sessions, prompts, responses, agents, templates, and their relationships in a graph structure.
All operations are non-blocking and can be executed concurrently without performance degradation.
§Examples
use llm_memory_graph::engine::AsyncMemoryGraph;
use llm_memory_graph::Config;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = Config::new("./data/my_graph.db");
let graph = AsyncMemoryGraph::open(config).await?;
let session = graph.create_session().await?;
let prompt_id = graph.add_prompt(session.id, "What is Rust?".to_string(), None).await?;
Ok(())
}Implementations§
Source§impl AsyncMemoryGraph
impl AsyncMemoryGraph
Sourcepub async fn open(config: Config) -> Result<Self>
pub async fn open(config: Config) -> Result<Self>
Open or create an async memory graph with the given configuration
This will create the database directory if it doesn’t exist and initialize all necessary storage trees. Operations use Tokio’s async runtime.
§Errors
Returns an error if:
- The database path is invalid or inaccessible
- Storage initialization fails
- Existing data is corrupted
§Examples
use llm_memory_graph::engine::AsyncMemoryGraph;
use llm_memory_graph::Config;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = Config::new("./data/graph.db");
let graph = AsyncMemoryGraph::open(config).await?;
Ok(())
}Sourcepub async fn with_observatory(
config: Config,
publisher: Option<Arc<dyn EventPublisher>>,
obs_config: ObservatoryConfig,
) -> Result<Self>
pub async fn with_observatory( config: Config, publisher: Option<Arc<dyn EventPublisher>>, obs_config: ObservatoryConfig, ) -> Result<Self>
Open graph with Observatory integration
§Examples
use llm_memory_graph::engine::AsyncMemoryGraph;
use llm_memory_graph::observatory::{ObservatoryConfig, InMemoryPublisher};
use llm_memory_graph::Config;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = Config::default();
let publisher = Arc::new(InMemoryPublisher::new());
let obs_config = ObservatoryConfig::new().enabled();
let graph = AsyncMemoryGraph::with_observatory(
config,
Some(publisher),
obs_config
).await?;
Ok(())
}Sourcepub fn get_metrics(&self) -> Option<MetricsSnapshot>
pub fn get_metrics(&self) -> Option<MetricsSnapshot>
Get metrics snapshot
Sourcepub async fn create_session(&self) -> Result<ConversationSession>
pub async fn create_session(&self) -> Result<ConversationSession>
Create a new conversation session asynchronously
Sessions are used to group related prompts and responses together. Each session has a unique ID and can store custom metadata.
§Errors
Returns an error if the session cannot be persisted to storage.
§Examples
let session = graph.create_session().await?;
println!("Created session: {}", session.id);Sourcepub async fn create_session_with_metadata(
&self,
metadata: HashMap<String, String>,
) -> Result<ConversationSession>
pub async fn create_session_with_metadata( &self, metadata: HashMap<String, String>, ) -> Result<ConversationSession>
Create a session with custom metadata asynchronously
§Examples
let mut metadata = HashMap::new();
metadata.insert("user_id".to_string(), "123".to_string());
let session = graph.create_session_with_metadata(metadata).await?;Sourcepub async fn get_session(
&self,
session_id: SessionId,
) -> Result<ConversationSession>
pub async fn get_session( &self, session_id: SessionId, ) -> Result<ConversationSession>
Get a session by ID asynchronously
This will first check the in-memory cache, then fall back to storage.
§Errors
Returns an error if the session doesn’t exist or storage retrieval fails.
Sourcepub async fn add_prompt(
&self,
session_id: SessionId,
content: String,
metadata: Option<PromptMetadata>,
) -> Result<NodeId>
pub async fn add_prompt( &self, session_id: SessionId, content: String, metadata: Option<PromptMetadata>, ) -> Result<NodeId>
Add a prompt node to a session asynchronously
§Examples
let prompt_id = graph.add_prompt(
session.id,
"Explain async/await in Rust".to_string(),
None
).await?;Sourcepub async fn add_prompts_batch(
&self,
prompts: Vec<(SessionId, String)>,
) -> Result<Vec<NodeId>>
pub async fn add_prompts_batch( &self, prompts: Vec<(SessionId, String)>, ) -> Result<Vec<NodeId>>
Add multiple prompts concurrently (batch operation)
This method processes all prompts in parallel for maximum throughput.
§Examples
let prompts = vec![
(session.id, "First prompt".to_string()),
(session.id, "Second prompt".to_string()),
];
let ids = graph.add_prompts_batch(prompts).await?;Sourcepub async fn add_response(
&self,
prompt_id: NodeId,
content: String,
token_usage: TokenUsage,
metadata: Option<ResponseMetadata>,
) -> Result<NodeId>
pub async fn add_response( &self, prompt_id: NodeId, content: String, token_usage: TokenUsage, metadata: Option<ResponseMetadata>, ) -> Result<NodeId>
Add a response node linked to a prompt asynchronously
§Examples
let usage = TokenUsage::new(10, 50);
let response_id = graph.add_response(
prompt_id,
"Async operations are non-blocking!".to_string(),
usage,
None
).await?;Sourcepub async fn add_agent(&self, agent: AgentNode) -> Result<AgentId>
pub async fn add_agent(&self, agent: AgentNode) -> Result<AgentId>
Add an agent node asynchronously
§Examples
let agent = AgentNode::new(
"CodeReviewer".to_string(),
"code-review".to_string(),
vec!["rust".to_string(), "python".to_string()]
);
let agent_id = graph.add_agent(agent).await?;Sourcepub async fn update_agent(&self, agent: AgentNode) -> Result<()>
pub async fn update_agent(&self, agent: AgentNode) -> Result<()>
Update an existing agent asynchronously
This invalidates the cache entry for the agent to ensure consistency.
Sourcepub async fn assign_agent_to_prompt(
&self,
prompt_id: NodeId,
agent_node_id: NodeId,
) -> Result<()>
pub async fn assign_agent_to_prompt( &self, prompt_id: NodeId, agent_node_id: NodeId, ) -> Result<()>
Assign an agent to handle a prompt asynchronously
Creates a HandledBy edge from the prompt to the agent.
Sourcepub async fn transfer_to_agent(
&self,
from_response: NodeId,
to_agent_node_id: NodeId,
) -> Result<()>
pub async fn transfer_to_agent( &self, from_response: NodeId, to_agent_node_id: NodeId, ) -> Result<()>
Transfer from one agent to another asynchronously
Creates a TransfersTo edge representing agent handoff.
Sourcepub async fn create_template(
&self,
template: PromptTemplate,
) -> Result<TemplateId>
pub async fn create_template( &self, template: PromptTemplate, ) -> Result<TemplateId>
Create a new prompt template asynchronously
§Examples
let template = PromptTemplate::new(
"Greeting".to_string(),
"Hello {{name}}!".to_string(),
vec![]
);
let template_id = graph.create_template(template).await?;Sourcepub async fn update_template(&self, template: PromptTemplate) -> Result<()>
pub async fn update_template(&self, template: PromptTemplate) -> Result<()>
Update an existing template asynchronously
This invalidates the cache entry for the template to ensure consistency.
Sourcepub async fn get_template(
&self,
template_id: TemplateId,
) -> Result<PromptTemplate>
pub async fn get_template( &self, template_id: TemplateId, ) -> Result<PromptTemplate>
Get a template by its template ID asynchronously
Sourcepub async fn get_template_by_node_id(
&self,
node_id: NodeId,
) -> Result<PromptTemplate>
pub async fn get_template_by_node_id( &self, node_id: NodeId, ) -> Result<PromptTemplate>
Get a template by its node ID asynchronously
Sourcepub async fn create_template_from_parent(
&self,
template: PromptTemplate,
parent_node_id: NodeId,
) -> Result<TemplateId>
pub async fn create_template_from_parent( &self, template: PromptTemplate, parent_node_id: NodeId, ) -> Result<TemplateId>
Create template from parent (inheritance) asynchronously
Sourcepub async fn link_prompt_to_template(
&self,
prompt_id: NodeId,
template_node_id: NodeId,
) -> Result<()>
pub async fn link_prompt_to_template( &self, prompt_id: NodeId, template_node_id: NodeId, ) -> Result<()>
Link a prompt to the template it was instantiated from
Sourcepub async fn add_tool_invocation(&self, tool: ToolInvocation) -> Result<NodeId>
pub async fn add_tool_invocation(&self, tool: ToolInvocation) -> Result<NodeId>
Add a tool invocation node asynchronously
§Examples
let tool = ToolInvocation::new(
response_id,
"calculator".to_string(),
serde_json::json!({"operation": "add", "a": 2, "b": 3})
);
let tool_id = graph.add_tool_invocation(tool).await?;Sourcepub async fn update_tool_invocation(&self, tool: ToolInvocation) -> Result<()>
pub async fn update_tool_invocation(&self, tool: ToolInvocation) -> Result<()>
Update tool invocation with results asynchronously
This invalidates the cache entry for the tool to ensure consistency.
Sourcepub async fn get_node(&self, id: &NodeId) -> Result<Option<Node>>
pub async fn get_node(&self, id: &NodeId) -> Result<Option<Node>>
Get a node by ID asynchronously (cache-aware)
This method first checks the cache for the node. If found in cache, it returns immediately (< 1ms latency). Otherwise, it loads from storage and populates the cache for future requests.
§Performance
- Cache hit: < 1ms latency
- Cache miss: 2-10ms latency (loads from storage)
§Examples
let node = graph.get_node(&node_id).await?;Sourcepub async fn get_edge(&self, id: &EdgeId) -> Result<Option<Edge>>
pub async fn get_edge(&self, id: &EdgeId) -> Result<Option<Edge>>
Get an edge by ID asynchronously (cache-aware)
This method first checks the cache for the edge. If found in cache, it returns immediately. Otherwise, it loads from storage and populates the cache for future requests.
§Performance
- Cache hit: < 1ms latency
- Cache miss: 2-10ms latency (loads from storage)
Sourcepub async fn add_edge(
&self,
from: NodeId,
to: NodeId,
edge_type: EdgeType,
) -> Result<()>
pub async fn add_edge( &self, from: NodeId, to: NodeId, edge_type: EdgeType, ) -> Result<()>
Add a custom edge asynchronously
Sourcepub async fn get_outgoing_edges(&self, node_id: &NodeId) -> Result<Vec<Edge>>
pub async fn get_outgoing_edges(&self, node_id: &NodeId) -> Result<Vec<Edge>>
Get all outgoing edges from a node asynchronously
Sourcepub async fn get_incoming_edges(&self, node_id: &NodeId) -> Result<Vec<Edge>>
pub async fn get_incoming_edges(&self, node_id: &NodeId) -> Result<Vec<Edge>>
Get all incoming edges to a node asynchronously
Sourcepub async fn get_session_nodes(
&self,
session_id: &SessionId,
) -> Result<Vec<Node>>
pub async fn get_session_nodes( &self, session_id: &SessionId, ) -> Result<Vec<Node>>
Get all nodes in a session asynchronously
Sourcepub async fn store_nodes_batch(&self, nodes: Vec<Node>) -> Result<Vec<NodeId>>
pub async fn store_nodes_batch(&self, nodes: Vec<Node>) -> Result<Vec<NodeId>>
Store multiple nodes concurrently asynchronously
This method leverages async concurrency to store multiple nodes in parallel.
Sourcepub async fn store_edges_batch(&self, edges: Vec<Edge>) -> Result<()>
pub async fn store_edges_batch(&self, edges: Vec<Edge>) -> Result<()>
Store multiple edges concurrently asynchronously
Sourcepub async fn add_responses_batch(
&self,
responses: Vec<(NodeId, String, TokenUsage)>,
) -> Result<Vec<NodeId>>
pub async fn add_responses_batch( &self, responses: Vec<(NodeId, String, TokenUsage)>, ) -> Result<Vec<NodeId>>
Add multiple responses concurrently (batch operation)
This method processes all responses in parallel for maximum throughput. Each response is linked to its corresponding prompt.
§Examples
let responses = vec![
(prompt1, "Answer 1".to_string(), TokenUsage::new(10, 50)),
(prompt2, "Answer 2".to_string(), TokenUsage::new(15, 60)),
];
let ids = graph.add_responses_batch(responses).await?;Sourcepub async fn create_sessions_batch(
&self,
count: usize,
) -> Result<Vec<ConversationSession>>
pub async fn create_sessions_batch( &self, count: usize, ) -> Result<Vec<ConversationSession>>
Create multiple sessions concurrently (batch operation)
This method creates multiple sessions in parallel for maximum throughput.
§Examples
let sessions = graph.create_sessions_batch(5).await?;
assert_eq!(sessions.len(), 5);Sourcepub async fn get_nodes_batch(
&self,
ids: Vec<NodeId>,
) -> Result<Vec<Option<Node>>>
pub async fn get_nodes_batch( &self, ids: Vec<NodeId>, ) -> Result<Vec<Option<Node>>>
Retrieve multiple nodes concurrently (batch operation)
This method fetches all nodes in parallel for maximum throughput. Returns nodes in the same order as the input IDs. Missing nodes are None.
§Examples
let ids = vec![id1, id2];
let nodes = graph.get_nodes_batch(ids).await?;
assert_eq!(nodes.len(), 2);Sourcepub async fn delete_nodes_batch(&self, ids: Vec<NodeId>) -> Result<()>
pub async fn delete_nodes_batch(&self, ids: Vec<NodeId>) -> Result<()>
Delete multiple nodes concurrently (batch operation)
This method deletes all nodes in parallel for maximum throughput. Note: This does not cascade delete related edges - you may want to delete related edges separately.
§Examples
let ids = vec![id1, id2];
graph.delete_nodes_batch(ids).await?;Sourcepub async fn add_conversations_batch(
&self,
conversations: Vec<((SessionId, String), Option<(String, TokenUsage)>)>,
) -> Result<Vec<(NodeId, Option<NodeId>)>>
pub async fn add_conversations_batch( &self, conversations: Vec<((SessionId, String), Option<(String, TokenUsage)>)>, ) -> Result<Vec<(NodeId, Option<NodeId>)>>
Process a mixed batch of prompts and responses concurrently
This is an advanced operation that allows you to add prompts and their responses in a single concurrent batch operation. This is useful for bulk importing conversation data.
§Examples
let conversations = vec![
(
(session.id, "What is Rust?".to_string()),
Some(("Rust is a systems programming language".to_string(), TokenUsage::new(5, 30))),
),
(
(session.id, "How does async work?".to_string()),
Some(("Async in Rust is zero-cost".to_string(), TokenUsage::new(6, 25))),
),
];
let results = graph.add_conversations_batch(conversations).await?;Sourcepub async fn stats(&self) -> Result<StorageStats>
pub async fn stats(&self) -> Result<StorageStats>
Get storage statistics asynchronously
Sourcepub fn query(&self) -> AsyncQueryBuilder
pub fn query(&self) -> AsyncQueryBuilder
Create a new async query builder for querying the graph
Returns an AsyncQueryBuilder that provides a fluent API for building
and executing queries with filtering, pagination, and streaming support.
§Examples
use llm_memory_graph::engine::AsyncMemoryGraph;
use llm_memory_graph::types::NodeType;
use llm_memory_graph::Config;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let graph = AsyncMemoryGraph::open(Config::default()).await?;
let session = graph.create_session().await?;
// Query with fluent API
let prompts = graph.query()
.session(session.id)
.node_type(NodeType::Prompt)
.limit(10)
.execute()
.await?;
println!("Found {} prompts", prompts.len());
Ok(())
}