cache_flow/
cache_flow.rs

1//! Cache Flow Example
2//!
3//! Demonstrates the caching layer features:
4//! - Node result caching
5//! - Cache invalidation strategies (LRU, FIFO, LFU)
6//! - TTL support with automatic expiration
7//! - Memory limits with automatic eviction
8//! - Cache statistics and monitoring
9//!
10//! Run with:
11//! ```bash
12//! cargo run --example cache_flow
13//! ```
14
15use rust_logic_graph::{
16    cache::{CacheConfig, CacheManager, EvictionPolicy},
17    rule::RuleError,
18    Context, Executor, Graph, GraphDef, Node, NodeType,
19};
20use serde_json::json;
21use std::time::Duration;
22use tracing::{info, Level};
23use tracing_subscriber;
24
25/// Custom node that simulates expensive computation
26struct ExpensiveComputeNode {
27    id: String,
28    computation_time_ms: u64,
29}
30
31impl ExpensiveComputeNode {
32    fn new(id: impl Into<String>, computation_time_ms: u64) -> Self {
33        Self {
34            id: id.into(),
35            computation_time_ms,
36        }
37    }
38}
39
40#[async_trait::async_trait]
41impl Node for ExpensiveComputeNode {
42    fn id(&self) -> &str {
43        &self.id
44    }
45
46    fn node_type(&self) -> NodeType {
47        NodeType::RuleNode
48    }
49
50    async fn run(&self, context: &mut Context) -> Result<serde_json::Value, RuleError> {
51        info!(
52            "Node '{}': Starting expensive computation ({} ms)...",
53            self.id, self.computation_time_ms
54        );
55
56        // Simulate expensive computation
57        tokio::time::sleep(Duration::from_millis(self.computation_time_ms)).await;
58
59        // Get input value
60        let input = context
61            .data
62            .get("input")
63            .and_then(|v| v.as_i64())
64            .unwrap_or(0);
65
66        // Compute result (factorial)
67        let result = (1..=input).product::<i64>();
68
69        // Store in context
70        context.data.insert(
71            format!("{}_result", self.id),
72            json!(result)
73        );
74
75        info!(
76            "Node '{}': Computation complete. Result: {}",
77            self.id, result
78        );
79
80        Ok(json!(result))
81    }
82}
83
84async fn demo_basic_caching() -> anyhow::Result<()> {
85    println!("\n=== Demo 1: Basic Caching ===\n");
86
87    // Create cache with default configuration
88    let cache_config = CacheConfig {
89        max_entries: 100,
90        max_memory_bytes: 10 * 1024 * 1024, // 10MB
91        default_ttl: None,                   // No expiration
92        eviction_policy: EvictionPolicy::LRU,
93        enable_background_cleanup: false,
94    };
95
96    let cache = CacheManager::new(cache_config).await?;
97
98    // Create executor with cache
99    let mut executor = Executor::with_cache(cache.clone());
100
101    // Register expensive computation node
102    executor.register_node(Box::new(ExpensiveComputeNode::new("compute", 1000)));
103
104    // Create simple graph
105    let graph_def = GraphDef {
106        nodes: vec![("compute".to_string(), NodeType::RuleNode)]
107            .into_iter()
108            .collect(),
109        edges: vec![],
110    };
111
112    // First execution - cache miss
113    info!("First execution (cache miss):");
114    let mut graph1 = Graph::new(graph_def.clone());
115    graph1.context.set("input", json!(5))?;
116
117    let start = std::time::Instant::now();
118    executor.execute(&mut graph1).await?;
119    let duration1 = start.elapsed();
120
121    println!("First execution time: {:?}", duration1);
122    println!("Cache stats: {:?}", cache.stats());
123
124    // Second execution - cache hit
125    info!("\nSecond execution (cache hit):");
126    let mut graph2 = Graph::new(graph_def.clone());
127    graph2.context.set("input", json!(5))?; // Same input
128
129    let start = std::time::Instant::now();
130    executor.execute(&mut graph2).await?;
131    let duration2 = start.elapsed();
132
133    println!("Second execution time: {:?}", duration2);
134    println!("Cache stats: {:?}", cache.stats());
135    println!("Speedup: {:.2}x", duration1.as_secs_f64() / duration2.as_secs_f64());
136
137    // Third execution with different input - cache miss
138    info!("\nThird execution with different input (cache miss):");
139    let mut graph3 = Graph::new(graph_def);
140    graph3.context.set("input", json!(10))?; // Different input
141
142    let start = std::time::Instant::now();
143    executor.execute(&mut graph3).await?;
144    let duration3 = start.elapsed();
145
146    println!("Third execution time: {:?}", duration3);
147    println!("Final cache stats: {:?}", cache.stats());
148    println!("Hit rate: {:.1}%", cache.stats().hit_rate());
149
150    Ok(())
151}
152
153async fn demo_ttl_expiration() -> anyhow::Result<()> {
154    println!("\n\n=== Demo 2: TTL Expiration ===\n");
155
156    // Create cache with short TTL
157    let cache_config = CacheConfig {
158        max_entries: 100,
159        max_memory_bytes: 10 * 1024 * 1024,
160        default_ttl: Some(Duration::from_secs(2)), // 2 second TTL
161        eviction_policy: EvictionPolicy::LRU,
162        enable_background_cleanup: true,
163    };
164
165    let cache = CacheManager::new(cache_config).await?;
166    let mut executor = Executor::with_cache(cache.clone());
167    executor.register_node(Box::new(ExpensiveComputeNode::new("compute", 500)));
168
169    let graph_def = GraphDef {
170        nodes: vec![("compute".to_string(), NodeType::RuleNode)]
171            .into_iter()
172            .collect(),
173        edges: vec![],
174    };
175
176    // First execution
177    info!("First execution:");
178    let mut graph1 = Graph::new(graph_def.clone());
179    graph1.context.set("input", json!(7))?;
180    executor.execute(&mut graph1).await?;
181    println!("Cache stats after first execution: {:?}", cache.stats());
182
183    // Immediate second execution - should hit cache
184    info!("\nImmediate second execution (within TTL):");
185    let mut graph2 = Graph::new(graph_def.clone());
186    graph2.context.set("input", json!(7))?;
187    executor.execute(&mut graph2).await?;
188    println!("Cache stats: {:?}", cache.stats());
189
190    // Wait for TTL to expire
191    println!("\nWaiting for TTL to expire (3 seconds)...");
192    tokio::time::sleep(Duration::from_secs(3)).await;
193
194    // Third execution after TTL - should miss cache
195    info!("\nThird execution (after TTL expiration):");
196    let mut graph3 = Graph::new(graph_def);
197    graph3.context.set("input", json!(7))?;
198    executor.execute(&mut graph3).await?;
199    println!("Cache stats after expiration: {:?}", cache.stats());
200
201    Ok(())
202}
203
204async fn demo_eviction_policies() -> anyhow::Result<()> {
205    println!("\n\n=== Demo 3: Eviction Policies ===\n");
206
207    for policy in [EvictionPolicy::LRU, EvictionPolicy::FIFO, EvictionPolicy::LFU] {
208        println!("\n--- Testing {:?} Policy ---", policy);
209
210        let cache_config = CacheConfig {
211            max_entries: 3, // Small limit to trigger eviction
212            max_memory_bytes: 10 * 1024 * 1024,
213            default_ttl: None,
214            eviction_policy: policy,
215            enable_background_cleanup: false,
216        };
217
218        let cache = CacheManager::new(cache_config).await?;
219        let mut executor = Executor::with_cache(cache.clone());
220        executor.register_node(Box::new(ExpensiveComputeNode::new("compute", 100)));
221
222        let graph_def = GraphDef {
223            nodes: vec![("compute".to_string(), NodeType::RuleNode)]
224                .into_iter()
225                .collect(),
226            edges: vec![],
227        };
228
229        // Add 4 entries (should evict 1)
230        for i in 1..=4 {
231            let mut graph = Graph::new(graph_def.clone());
232            graph.context.set("input", json!(i))?;
233            executor.execute(&mut graph).await?;
234            println!("Added entry {}, cache size: {}", i, cache.len());
235        }
236
237        let stats = cache.stats();
238        println!("Final stats: entries={}, evictions={}", 
239                 stats.current_entries, stats.evictions);
240    }
241
242    Ok(())
243}
244
245async fn demo_memory_limits() -> anyhow::Result<()> {
246    println!("\n\n=== Demo 4: Memory Limits ===\n");
247
248    // Create cache with small memory limit
249    let cache_config = CacheConfig {
250        max_entries: 1000,
251        max_memory_bytes: 1024, // Only 1KB
252        default_ttl: None,
253        eviction_policy: EvictionPolicy::LRU,
254        enable_background_cleanup: false,
255    };
256
257    let cache = CacheManager::new(cache_config).await?;
258    let mut executor = Executor::with_cache(cache.clone());
259    executor.register_node(Box::new(ExpensiveComputeNode::new("compute", 50)));
260
261    let graph_def = GraphDef {
262        nodes: vec![("compute".to_string(), NodeType::RuleNode)]
263            .into_iter()
264            .collect(),
265        edges: vec![],
266    };
267
268    // Add entries until memory limit is reached
269    for i in 1..=20 {
270        let mut graph = Graph::new(graph_def.clone());
271        graph.context.set("input", json!(i))?;
272        graph.context.set("large_data", json!(vec![i; 100]))?; // Add some bulk
273        executor.execute(&mut graph).await?;
274
275        let stats = cache.stats();
276        println!(
277            "Entry {}: size={} entries, memory={} bytes, evictions={}",
278            i, stats.current_entries, stats.current_memory_bytes, stats.evictions
279        );
280    }
281
282    let stats = cache.stats();
283    println!("\nFinal memory usage: {} bytes (limit: 1024 bytes)", 
284             stats.current_memory_bytes);
285    println!("Total evictions: {}", stats.evictions);
286
287    Ok(())
288}
289
290async fn demo_cache_invalidation() -> anyhow::Result<()> {
291    println!("\n\n=== Demo 5: Cache Invalidation ===\n");
292
293    let cache = CacheManager::new(CacheConfig::default()).await?;
294    let mut executor = Executor::with_cache(cache.clone());
295    executor.register_node(Box::new(ExpensiveComputeNode::new("compute", 200)));
296
297    let graph_def = GraphDef {
298        nodes: vec![("compute".to_string(), NodeType::RuleNode)]
299            .into_iter()
300            .collect(),
301        edges: vec![],
302    };
303
304    // Execute multiple times with different inputs
305    for i in 1..=5 {
306        let mut graph = Graph::new(graph_def.clone());
307        graph.context.set("input", json!(i))?;
308        executor.execute(&mut graph).await?;
309    }
310
311    println!("Cache populated with {} entries", cache.len());
312    println!("Cache stats: {:?}", cache.stats());
313
314    // Invalidate specific node
315    println!("\nInvalidating all entries for node 'compute'...");
316    let invalidated = cache.invalidate_node("compute");
317    println!("Invalidated {} entries", invalidated);
318    println!("Cache size after invalidation: {}", cache.len());
319
320    // Re-execute - should be cache miss
321    info!("\nRe-executing after invalidation:");
322    let mut graph = Graph::new(graph_def);
323    graph.context.set("input", json!(1))?;
324    executor.execute(&mut graph).await?;
325    println!("Final cache stats: {:?}", cache.stats());
326
327    Ok(())
328}
329
330#[tokio::main]
331async fn main() -> anyhow::Result<()> {
332    // Initialize tracing
333    tracing_subscriber::fmt()
334        .with_max_level(Level::INFO)
335        .with_target(false)
336        .init();
337
338    println!("╔══════════════════════════════════════════════════════════╗");
339    println!("║         Rust Logic Graph - Cache Layer Demo             ║");
340    println!("╚══════════════════════════════════════════════════════════╝");
341
342    demo_basic_caching().await?;
343    demo_ttl_expiration().await?;
344    demo_eviction_policies().await?;
345    demo_memory_limits().await?;
346    demo_cache_invalidation().await?;
347
348    println!("\n\n✅ All cache demos completed successfully!");
349    println!("\nKey Features Demonstrated:");
350    println!("  ✓ Node result caching with automatic key generation");
351    println!("  ✓ Cache hit/miss tracking and statistics");
352    println!("  ✓ TTL-based expiration with background cleanup");
353    println!("  ✓ Multiple eviction policies (LRU, FIFO, LFU)");
354    println!("  ✓ Memory limits with automatic eviction");
355    println!("  ✓ Manual cache invalidation");
356
357    Ok(())
358}