use futures::stream::StreamExt;
use llm_memory_graph::{engine::AsyncMemoryGraph, types::NodeType, types::TokenUsage, Config};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("=== Async Streaming Queries Example ===\n");
let config = Config::new("./data/async_streaming_example.db");
let graph = AsyncMemoryGraph::open(config).await?;
println!("Creating session and populating with prompts...");
let session = graph.create_session().await?;
for i in 0..1000 {
let prompt_content = format!("Query {}: What is topic number {}?", i, i);
let prompt_id = graph.add_prompt(session.id, prompt_content, None).await?;
if i % 5 == 0 {
let response_content = format!("Answer to query {}", i);
let usage = TokenUsage::new(10, 50);
graph
.add_response(prompt_id, response_content, usage, None)
.await?;
}
}
println!("Created 1000 prompts and 200 responses\n");
println!("--- Example 1: Stream All Prompts ---");
let query = graph
.query()
.session(session.id)
.node_type(NodeType::Prompt);
let mut stream = query.execute_stream();
let mut count = 0;
println!("Streaming prompts one at a time (memory-efficient):");
while let Some(result) = stream.next().await {
let _node = result?;
count += 1;
if count % 100 == 0 {
println!(" Processed {} prompts...", count);
}
}
println!("Total prompts streamed: {}\n", count);
println!("--- Example 2: Paginated Queries ---");
let page_size = 50;
let mut page = 0;
loop {
let offset = page * page_size;
let results = graph
.query()
.session(session.id)
.node_type(NodeType::Prompt)
.offset(offset)
.limit(page_size)
.execute()
.await?;
if results.is_empty() {
break;
}
println!(
"Page {}: Retrieved {} prompts (offset: {})",
page,
results.len(),
offset
);
page += 1;
if page >= 3 {
println!(" ... (showing first 3 pages only)");
break;
}
}
println!();
println!("--- Example 3: Efficient Counting ---");
let prompt_count = graph
.query()
.session(session.id)
.node_type(NodeType::Prompt)
.count()
.await?;
println!("Total prompts (counted efficiently): {}", prompt_count);
let response_count = graph
.query()
.session(session.id)
.node_type(NodeType::Response)
.count()
.await?;
println!(
"Total responses (counted efficiently): {}\n",
response_count
);
println!("--- Example 4: Time Range Filtering ---");
use chrono::{Duration, Utc};
let now = Utc::now();
let one_minute_ago = now - Duration::minutes(1);
let recent_prompts = graph
.query()
.session(session.id)
.node_type(NodeType::Prompt)
.time_range(one_minute_ago, now)
.execute()
.await?;
println!(
"Prompts created in the last minute: {}",
recent_prompts.len()
);
println!("(Since we just created them, this should be 1000)\n");
println!("--- Example 5: Filtered Streaming ---");
let filtered_query = graph
.query()
.session(session.id)
.node_type(NodeType::Response)
.limit(50);
let mut filtered_stream = filtered_query.execute_stream();
let mut response_count_stream = 0;
while let Some(result) = filtered_stream.next().await {
let _node = result?;
response_count_stream += 1;
}
println!(
"Streamed {} responses (with limit 50)",
response_count_stream
);
println!();
println!("--- Example 6: Complex Filter Combinations ---");
let recent_count = graph
.query()
.session(session.id)
.node_type(NodeType::Prompt)
.time_range(one_minute_ago, now)
.count()
.await?;
println!("Recent prompts (last minute): {}", recent_count);
let first_page_responses = graph
.query()
.session(session.id)
.node_type(NodeType::Response)
.limit(10)
.execute()
.await?;
println!(
"First page of responses (10 items): {}",
first_page_responses.len()
);
println!();
println!("--- Example 7: Chunk Processing with Streaming ---");
let chunk_size = 100;
let mut processed = 0;
let query = graph
.query()
.session(session.id)
.node_type(NodeType::Prompt);
let mut stream = query.execute_stream();
let mut chunk = Vec::new();
while let Some(result) = stream.next().await {
let node = result?;
chunk.push(node);
if chunk.len() >= chunk_size {
processed += chunk.len();
println!(
" Processed chunk of {} items (total: {})",
chunk.len(),
processed
);
chunk.clear();
}
}
if !chunk.is_empty() {
processed += chunk.len();
println!(
" Processed final chunk of {} items (total: {})",
chunk.len(),
processed
);
}
println!();
println!("--- Cleanup ---");
println!("Flushing data to disk...");
graph.flush().await?;
let stats = graph.stats().await?;
println!("Final stats:");
println!(" Total nodes: {}", stats.node_count);
println!(" Total edges: {}", stats.edge_count);
println!("\n=== Example Complete ===");
println!("Key Takeaways:");
println!("1. Use execute_stream() for memory-efficient processing of large datasets");
println!("2. Use execute() for loading all results at once (with pagination)");
println!("3. Use count() to get totals without loading data");
println!(
"4. Combine filters (session, node_type, time_range, limit, offset) for precise queries"
);
println!("5. Process data in chunks to balance memory usage and performance");
println!("6. Leverage async/await for non-blocking I/O operations");
Ok(())
}