fcdb_exec/
lib.rs

1//! # Enishi Execution Engine
2//!
3//! Phase C: P10+ Adaptation - Advanced query optimization and adaptive systems
4//!
5//! Merkle DAG: enishi_exec -> adaptive_bloom, plan_switcher, meet_in_middle
6
7use fcdb_core::{Cid, QKey, compute_path_sig, compute_class_sig};
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, BTreeMap};
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12use bloom::{BloomFilter, ASMS};
13use rand::prelude::*;
14use statrs::distribution::{Normal, ContinuousCDF};
15
16// ===== PHASE C: Adaptive 三段Bloom Filters =====
17
18/// Adaptive bloom filter configuration
19#[derive(Clone, Debug)]
20pub struct AdaptiveBloomConfig {
21    pub target_fp_rate: f64,
22    pub max_memory_mb: usize,
23    pub adaptation_interval_secs: u64,
24}
25
26impl Default for AdaptiveBloomConfig {
27    fn default() -> Self {
28        Self {
29            target_fp_rate: 1e-6, // Very low false positive rate
30            max_memory_mb: 100,   // 100MB total
31            adaptation_interval_secs: 300, // 5 minutes
32        }
33    }
34}
35
36/// 三段Bloomフィルタシステム (Global/Pack/Shard)
37pub struct AdaptiveBloomSystem {
38    global: BloomFilter,
39    pack_filters: HashMap<u32, BloomFilter>,
40    shard_filters: HashMap<(u16, u64), BloomFilter>, // (type, time_bucket)
41
42    // Statistics for adaptation
43    global_fps: Vec<f64>,
44    pack_fps: HashMap<u32, Vec<f64>>,
45    shard_fps: HashMap<(u16, u64), Vec<f64>>,
46
47    config: AdaptiveBloomConfig,
48    last_adaptation: Instant,
49}
50
51impl AdaptiveBloomSystem {
52    pub fn new(config: AdaptiveBloomConfig) -> Self {
53        let initial_capacity = 1_000_000;
54        Self {
55            global: BloomFilter::with_rate(config.target_fp_rate as f32, initial_capacity as u32),
56            pack_filters: HashMap::new(),
57            shard_filters: HashMap::new(),
58            global_fps: Vec::new(),
59            pack_fps: HashMap::new(),
60            shard_fps: HashMap::new(),
61            config,
62            last_adaptation: Instant::now(),
63        }
64    }
65
66    /// Insert with type and time bucket for sharding
67    pub fn insert(&mut self, cid: &Cid, pack_id: u32, type_part: u16, time_bucket: u64) {
68        // Global filter
69        self.global.insert(cid.as_bytes());
70
71        // Pack filter
72        self.pack_filters
73            .entry(pack_id)
74            .or_insert_with(|| BloomFilter::with_rate(1e-7, 100_000))
75            .insert(cid.as_bytes());
76
77        // Shard filter - adaptive creation
78        let shard_key = (type_part, time_bucket);
79        self.shard_filters
80            .entry(shard_key)
81            .or_insert_with(|| BloomFilter::with_rate(1e-8, 10_000))
82            .insert(cid.as_bytes());
83    }
84
85    /// Query with hierarchical filtering
86    pub fn contains(&self, cid: &Cid, pack_id: Option<u32>, shard: Option<(u16, u64)>) -> bool {
87        // Check global first (fast rejection)
88        if !self.global.contains(cid.as_bytes()) {
89            return false;
90        }
91
92        // Check pack filter if specified
93        if let Some(pack_id) = pack_id {
94            if let Some(filter) = self.pack_filters.get(&pack_id) {
95                if !filter.contains(cid.as_bytes()) {
96                    return false;
97                }
98            }
99        }
100
101        // Check shard filter if specified
102        if let Some((type_part, time_bucket)) = shard {
103            if let Some(filter) = self.shard_filters.get(&(type_part, time_bucket)) {
104                if !filter.contains(cid.as_bytes()) {
105                    return false;
106                }
107            }
108        }
109
110        true
111    }
112
113    /// Record false positive for adaptation
114    pub fn record_fp(&mut self, pack_id: Option<u32>, shard: Option<(u16, u64)>) {
115        self.global_fps.push(1.0);
116
117        if let Some(pack_id) = pack_id {
118            self.pack_fps.entry(pack_id).or_insert_with(Vec::new).push(1.0);
119        }
120
121        if let Some(shard_key) = shard {
122            self.shard_fps.entry(shard_key).or_insert_with(Vec::new).push(1.0);
123        }
124
125        // Trigger adaptation if interval passed
126        if self.last_adaptation.elapsed() > Duration::from_secs(self.config.adaptation_interval_secs) {
127            self.adapt_filters();
128            self.last_adaptation = Instant::now();
129        }
130    }
131
132    /// Adaptive filter reconfiguration
133    fn adapt_filters(&mut self) {
134        // Calculate current FP rates
135        let global_fp_rate = self.global_fps.iter().sum::<f64>() / self.global_fps.len().max(1) as f64;
136
137        // Redistribute memory based on FP rates and access patterns
138        let total_memory = self.config.max_memory_mb * 1024 * 1024; // bytes
139
140        // Global filter gets 40% baseline
141        let global_memory = (total_memory as f64 * 0.4) as usize;
142
143        // Pack filters get 40% distributed by access frequency
144        let pack_memory = (total_memory as f64 * 0.4) as usize;
145
146        // Shard filters get 20% distributed by FP rate
147        let shard_memory = (total_memory as f64 * 0.2) as usize;
148
149        // Reconfigure filters with new sizes
150        self.reconfigure_filters(global_memory, pack_memory, shard_memory);
151
152        // Reset statistics
153        self.global_fps.clear();
154        self.pack_fps.clear();
155        self.shard_fps.clear();
156    }
157
158    fn reconfigure_filters(&mut self, global_mem: usize, pack_mem: usize, shard_mem: usize) {
159        // Estimate optimal sizes based on memory and target FP rates
160        // This is a simplified version - real implementation would use more sophisticated sizing
161        let global_capacity = (global_mem / 16).min(10_000_000); // Rough estimate
162        self.global = BloomFilter::with_rate(self.config.target_fp_rate as f32, global_capacity as u32);
163
164        // Rebuild pack and shard filters with new sizing
165        // (In practice, this would migrate existing data)
166        self.pack_filters.clear();
167        self.shard_filters.clear();
168    }
169}
170
171// ===== PHASE C: Plan Switcher with ε-greedy =====
172
173/// Query execution plan
174#[derive(Clone, Debug, Serialize, Deserialize)]
175pub enum QueryPlan {
176    PathFirst(Vec<String>),        // Follow path first
177    TypeFirst(Vec<String>),        // Filter by types first
178    MeetInMiddle(String),          // Split query at midpoint
179    IndexLookup(String),           // Direct index lookup
180}
181
182/// Plan performance statistics
183#[derive(Clone, Debug)]
184pub struct PlanStats {
185    pub plan: QueryPlan,
186    pub execution_time_ms: f64,
187    pub result_count: usize,
188    pub success: bool,
189    pub timestamp: u64,
190}
191
192/// ε-greedy plan switcher
193pub struct PlanSwitcher {
194    plan_stats: HashMap<String, Vec<PlanStats>>, // plan_key -> stats
195    epsilon: f64, // Exploration rate
196    plan_timeout_ms: u64,
197}
198
199impl PlanSwitcher {
200    pub fn new() -> Self {
201        Self {
202            plan_stats: HashMap::new(),
203            epsilon: 0.1, // 10% exploration
204            plan_timeout_ms: 1000, // 1 second timeout
205        }
206    }
207
208    /// Choose best plan with ε-greedy exploration
209    pub fn select_plan(&self, query_key: &str, available_plans: &[QueryPlan]) -> QueryPlan {
210        if available_plans.is_empty() {
211            return QueryPlan::PathFirst(vec![]);
212        }
213
214        // ε-greedy: explore or exploit
215        if rand::random::<f64>() < self.epsilon {
216            // Explore: random plan
217            available_plans.choose(&mut rand::thread_rng()).unwrap().clone()
218        } else {
219            // Exploit: best performing plan
220            self.select_best_plan(query_key, available_plans)
221        }
222    }
223
224    fn select_best_plan(&self, query_key: &str, available_plans: &[QueryPlan]) -> QueryPlan {
225        if let Some(stats) = self.plan_stats.get(query_key) {
226            // Find plan with lowest average execution time
227            let mut best_plan = &available_plans[0];
228            let mut best_time = f64::INFINITY;
229
230            for plan in available_plans {
231                let plan_key = self.plan_key(plan);
232                let avg_time = self.average_time_for_plan(&plan_key, stats);
233
234                if avg_time < best_time {
235                    best_time = avg_time;
236                    best_plan = plan;
237                }
238            }
239
240            best_plan.clone()
241        } else {
242            // No stats available, use first plan
243            available_plans[0].clone()
244        }
245    }
246
247    /// Record plan execution result
248    pub fn record_result(&mut self, query_key: &str, plan: &QueryPlan, execution_time_ms: f64, result_count: usize, success: bool) {
249        let stats = PlanStats {
250            plan: plan.clone(),
251            execution_time_ms,
252            result_count,
253            success,
254            timestamp: std::time::SystemTime::now()
255                .duration_since(std::time::UNIX_EPOCH)
256                .unwrap()
257                .as_secs(),
258        };
259
260        self.plan_stats.entry(query_key.to_string())
261            .or_insert_with(Vec::new)
262            .push(stats);
263
264        // Keep only recent stats (last 100 per plan type)
265        if let Some(stats_vec) = self.plan_stats.get_mut(query_key) {
266            if stats_vec.len() > 100 {
267                stats_vec.remove(0); // Remove oldest
268            }
269        }
270    }
271
272    fn plan_key(&self, plan: &QueryPlan) -> String {
273        match plan {
274            QueryPlan::PathFirst(path) => format!("path_first_{}", path.join("_")),
275            QueryPlan::TypeFirst(types) => format!("type_first_{}", types.join("_")),
276            QueryPlan::MeetInMiddle(split) => format!("meet_middle_{}", split),
277            QueryPlan::IndexLookup(index) => format!("index_lookup_{}", index),
278        }
279    }
280
281    fn average_time_for_plan(&self, plan_key: &str, all_stats: &[PlanStats]) -> f64 {
282        let plan_stats: Vec<_> = all_stats.iter()
283            .filter(|s| self.plan_key(&s.plan) == plan_key && s.success)
284            .collect();
285
286        if plan_stats.is_empty() {
287            return f64::INFINITY;
288        }
289
290        plan_stats.iter().map(|s| s.execution_time_ms).sum::<f64>() / plan_stats.len() as f64
291    }
292}
293
294// ===== PHASE C: Meet-in-the-middle Optimization =====
295
296/// Meet-in-the-middle query splitter
297pub struct MeetInMiddle {
298    max_split_depth: usize,
299    cost_estimator: CostEstimator,
300}
301
302impl MeetInMiddle {
303    pub fn new() -> Self {
304        Self {
305            max_split_depth: 5,
306            cost_estimator: CostEstimator::new(),
307        }
308    }
309
310    /// Split complex query into two halves meeting in middle
311    pub fn split_query(&self, query_path: &[&str], query_types: &[&str]) -> Option<QuerySplit> {
312        if query_path.len() < 3 {
313            return None; // Too simple for splitting
314        }
315
316        // Find optimal split point
317        let path_len = query_path.len();
318        let mut best_split = 0;
319        let mut best_cost = f64::INFINITY;
320
321        for split_point in 1..path_len {
322            let left_cost = self.cost_estimator.estimate_cost(&query_path[0..split_point], &[]);
323            let right_cost = self.cost_estimator.estimate_cost(&query_path[split_point..], &[]);
324            let total_cost = left_cost + right_cost + 1.0; // Join cost
325
326            if total_cost < best_cost {
327                best_cost = total_cost;
328                best_split = split_point;
329            }
330        }
331
332        Some(QuerySplit {
333            left_path: query_path[0..best_split].iter().map(|s| s.to_string()).collect(),
334            right_path: query_path[best_split..].iter().map(|s| s.to_string()).collect(),
335            join_key: query_path[best_split - 1].to_string(),
336            estimated_cost: best_cost,
337        })
338    }
339}
340
341/// Query split result
342#[derive(Clone, Debug)]
343pub struct QuerySplit {
344    pub left_path: Vec<String>,
345    pub right_path: Vec<String>,
346    pub join_key: String,
347    pub estimated_cost: f64,
348}
349
350/// Cost estimation for query optimization
351pub struct CostEstimator {
352    // Simple cost model - in practice would be learned from execution stats
353    pub base_selectivity: f64,
354    pub path_expansion_factor: f64,
355    pub type_filter_factor: f64,
356}
357
358impl CostEstimator {
359    pub fn new() -> Self {
360        Self {
361            base_selectivity: 0.1,      // 10% selectivity baseline
362            path_expansion_factor: 2.0,  // Each path step doubles work
363            type_filter_factor: 0.5,     // Type filters reduce by half
364        }
365    }
366
367    pub fn estimate_cost(&self, path: &[&str], types: &[&str]) -> f64 {
368        let path_cost = path.len() as f64 * self.path_expansion_factor;
369        let type_cost = if types.is_empty() { 1.0 } else {
370            types.len() as f64 * self.type_filter_factor
371        };
372
373        path_cost * type_cost * self.base_selectivity
374    }
375}
376
377// ===== PHASE C: Snapshot CID for Popular Temporal Points =====
378
379/// Snapshot manager for popular as_of points
380pub struct SnapshotManager {
381    snapshots: BTreeMap<u64, Cid>, // timestamp -> snapshot_cid
382    access_counts: HashMap<u64, u64>, // timestamp -> access_count
383    max_snapshots: usize,
384    snapshot_interval: u64, // seconds
385}
386
387impl SnapshotManager {
388    pub fn new(max_snapshots: usize) -> Self {
389        Self {
390            snapshots: BTreeMap::new(),
391            access_counts: HashMap::new(),
392            max_snapshots,
393            snapshot_interval: 3600, // 1 hour
394        }
395    }
396
397    /// Get or create snapshot for timestamp
398    pub fn get_snapshot(&mut self, as_of: u64) -> Option<Cid> {
399        // Record access
400        *self.access_counts.entry(as_of).or_insert(0) += 1;
401
402        // Find closest snapshot
403        self.snapshots.range(..=as_of)
404            .next_back()
405            .map(|(_, cid)| *cid)
406    }
407
408    /// Create new snapshot at timestamp
409    pub fn create_snapshot(&mut self, as_of: u64, data_cid: Cid) {
410        // Remove old snapshots if over limit
411        while self.snapshots.len() >= self.max_snapshots {
412            if let Some(oldest_ts) = self.snapshots.iter().next().map(|(k, _)| *k) {
413                self.snapshots.remove(&oldest_ts);
414                self.access_counts.remove(&oldest_ts);
415            }
416        }
417
418        self.snapshots.insert(as_of, data_cid);
419    }
420
421    /// Get popular snapshot timestamps for precomputation
422    pub fn get_popular_timestamps(&self, top_k: usize) -> Vec<u64> {
423        let mut popular: Vec<(u64, u64)> = self.access_counts.iter()
424            .map(|(ts, count)| (*ts, *count))
425            .collect();
426
427        popular.sort_by(|a, b| b.1.cmp(&a.1)); // Sort by access count descending
428
429        popular.into_iter()
430            .take(top_k)
431            .map(|(ts, _)| ts)
432            .collect()
433    }
434}
435
436// ===== PHASE C: SIMD VarInt (Placeholder for future SIMD implementation) =====
437
438/// SIMD-accelerated VarInt encoding/decoding
439pub mod simd_varint {
440    use fcdb_core::varint;
441
442    /// SIMD VarInt encoder (placeholder - would use SIMD instructions)
443    pub fn encode_simd(values: &[u64]) -> Vec<u8> {
444        let mut result = Vec::new();
445        for &value in values {
446            varint::encode_u64(value, &mut result);
447        }
448        result
449    }
450
451    /// SIMD VarInt decoder (placeholder)
452    pub fn decode_simd(data: &[u8]) -> Vec<u64> {
453        let mut result = Vec::new();
454        let mut reader = data;
455        while !reader.is_empty() {
456            if let Ok(value) = varint::decode_u64(&mut reader) {
457                result.push(value);
458            } else {
459                break;
460            }
461        }
462        result
463    }
464}
465
466#[cfg(test)]
467mod tests {
468    use super::*;
469
470    #[test]
471    fn test_adaptive_bloom() {
472        let mut bloom = AdaptiveBloomSystem::new(AdaptiveBloomConfig::default());
473        let cid = Cid([1u8; 32]);
474
475        bloom.insert(&cid, 1, 100, 1234567890);
476        assert!(bloom.contains(&cid, Some(1), Some((100, 1234567890))));
477        assert!(!bloom.contains(&Cid([2u8; 32]), None, None));
478    }
479
480    #[test]
481    fn test_plan_switcher() {
482        let mut switcher = PlanSwitcher::new();
483
484        let plans = vec![
485            QueryPlan::PathFirst(vec!["user".to_string()]),
486            QueryPlan::TypeFirst(vec!["User".to_string()]),
487        ];
488
489        let selected = switcher.select_plan("test_query", &plans);
490        // Should select one of the available plans
491        assert!(matches!(selected, QueryPlan::PathFirst(_) | QueryPlan::TypeFirst(_)));
492
493        // Record a result
494        switcher.record_result("test_query", &plans[0], 10.0, 100, true);
495
496        // Should prefer the faster plan on subsequent calls
497        let selected2 = switcher.select_plan("test_query", &plans);
498        assert!(matches!(selected2, QueryPlan::PathFirst(_)));
499    }
500
501    #[test]
502    fn test_meet_in_middle() {
503        let mim = MeetInMiddle::new();
504        let query_path = &["user", "posts", "comments", "replies"];
505
506        if let Some(split) = mim.split_query(query_path, &[]) {
507            assert_eq!(split.left_path.len() + split.right_path.len(), query_path.len());
508            assert!(split.estimated_cost > 0.0);
509        } else {
510            panic!("Should split this query");
511        }
512    }
513
514    #[test]
515    fn test_snapshot_manager() {
516        let mut manager = SnapshotManager::new(10);
517        let cid = Cid([42u8; 32]);
518
519        manager.create_snapshot(1000, cid);
520        assert_eq!(manager.get_snapshot(1000), Some(cid));
521        assert_eq!(manager.get_snapshot(1500), Some(cid)); // Should find closest
522
523        let popular = manager.get_popular_timestamps(5);
524        assert!(!popular.is_empty());
525    }
526}