1use fcdb_core::{Cid, QKey, compute_path_sig, compute_class_sig};
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, BTreeMap};
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12use bloom::{BloomFilter, ASMS};
13use rand::prelude::*;
14use statrs::distribution::{Normal, ContinuousCDF};
15
16#[derive(Clone, Debug)]
20pub struct AdaptiveBloomConfig {
21 pub target_fp_rate: f64,
22 pub max_memory_mb: usize,
23 pub adaptation_interval_secs: u64,
24}
25
26impl Default for AdaptiveBloomConfig {
27 fn default() -> Self {
28 Self {
29 target_fp_rate: 1e-6, max_memory_mb: 100, adaptation_interval_secs: 300, }
33 }
34}
35
36pub struct AdaptiveBloomSystem {
38 global: BloomFilter,
39 pack_filters: HashMap<u32, BloomFilter>,
40 shard_filters: HashMap<(u16, u64), BloomFilter>, global_fps: Vec<f64>,
44 pack_fps: HashMap<u32, Vec<f64>>,
45 shard_fps: HashMap<(u16, u64), Vec<f64>>,
46
47 config: AdaptiveBloomConfig,
48 last_adaptation: Instant,
49}
50
51impl AdaptiveBloomSystem {
52 pub fn new(config: AdaptiveBloomConfig) -> Self {
53 let initial_capacity = 1_000_000;
54 Self {
55 global: BloomFilter::with_rate(config.target_fp_rate as f32, initial_capacity as u32),
56 pack_filters: HashMap::new(),
57 shard_filters: HashMap::new(),
58 global_fps: Vec::new(),
59 pack_fps: HashMap::new(),
60 shard_fps: HashMap::new(),
61 config,
62 last_adaptation: Instant::now(),
63 }
64 }
65
66 pub fn insert(&mut self, cid: &Cid, pack_id: u32, type_part: u16, time_bucket: u64) {
68 self.global.insert(cid.as_bytes());
70
71 self.pack_filters
73 .entry(pack_id)
74 .or_insert_with(|| BloomFilter::with_rate(1e-7, 100_000))
75 .insert(cid.as_bytes());
76
77 let shard_key = (type_part, time_bucket);
79 self.shard_filters
80 .entry(shard_key)
81 .or_insert_with(|| BloomFilter::with_rate(1e-8, 10_000))
82 .insert(cid.as_bytes());
83 }
84
85 pub fn contains(&self, cid: &Cid, pack_id: Option<u32>, shard: Option<(u16, u64)>) -> bool {
87 if !self.global.contains(cid.as_bytes()) {
89 return false;
90 }
91
92 if let Some(pack_id) = pack_id {
94 if let Some(filter) = self.pack_filters.get(&pack_id) {
95 if !filter.contains(cid.as_bytes()) {
96 return false;
97 }
98 }
99 }
100
101 if let Some((type_part, time_bucket)) = shard {
103 if let Some(filter) = self.shard_filters.get(&(type_part, time_bucket)) {
104 if !filter.contains(cid.as_bytes()) {
105 return false;
106 }
107 }
108 }
109
110 true
111 }
112
113 pub fn record_fp(&mut self, pack_id: Option<u32>, shard: Option<(u16, u64)>) {
115 self.global_fps.push(1.0);
116
117 if let Some(pack_id) = pack_id {
118 self.pack_fps.entry(pack_id).or_insert_with(Vec::new).push(1.0);
119 }
120
121 if let Some(shard_key) = shard {
122 self.shard_fps.entry(shard_key).or_insert_with(Vec::new).push(1.0);
123 }
124
125 if self.last_adaptation.elapsed() > Duration::from_secs(self.config.adaptation_interval_secs) {
127 self.adapt_filters();
128 self.last_adaptation = Instant::now();
129 }
130 }
131
132 fn adapt_filters(&mut self) {
134 let global_fp_rate = self.global_fps.iter().sum::<f64>() / self.global_fps.len().max(1) as f64;
136
137 let total_memory = self.config.max_memory_mb * 1024 * 1024; let global_memory = (total_memory as f64 * 0.4) as usize;
142
143 let pack_memory = (total_memory as f64 * 0.4) as usize;
145
146 let shard_memory = (total_memory as f64 * 0.2) as usize;
148
149 self.reconfigure_filters(global_memory, pack_memory, shard_memory);
151
152 self.global_fps.clear();
154 self.pack_fps.clear();
155 self.shard_fps.clear();
156 }
157
158 fn reconfigure_filters(&mut self, global_mem: usize, pack_mem: usize, shard_mem: usize) {
159 let global_capacity = (global_mem / 16).min(10_000_000); self.global = BloomFilter::with_rate(self.config.target_fp_rate as f32, global_capacity as u32);
163
164 self.pack_filters.clear();
167 self.shard_filters.clear();
168 }
169}
170
171#[derive(Clone, Debug, Serialize, Deserialize)]
175pub enum QueryPlan {
176 PathFirst(Vec<String>), TypeFirst(Vec<String>), MeetInMiddle(String), IndexLookup(String), }
181
182#[derive(Clone, Debug)]
184pub struct PlanStats {
185 pub plan: QueryPlan,
186 pub execution_time_ms: f64,
187 pub result_count: usize,
188 pub success: bool,
189 pub timestamp: u64,
190}
191
192pub struct PlanSwitcher {
194 plan_stats: HashMap<String, Vec<PlanStats>>, epsilon: f64, plan_timeout_ms: u64,
197}
198
199impl PlanSwitcher {
200 pub fn new() -> Self {
201 Self {
202 plan_stats: HashMap::new(),
203 epsilon: 0.1, plan_timeout_ms: 1000, }
206 }
207
208 pub fn select_plan(&self, query_key: &str, available_plans: &[QueryPlan]) -> QueryPlan {
210 if available_plans.is_empty() {
211 return QueryPlan::PathFirst(vec![]);
212 }
213
214 if rand::random::<f64>() < self.epsilon {
216 available_plans.choose(&mut rand::thread_rng()).unwrap().clone()
218 } else {
219 self.select_best_plan(query_key, available_plans)
221 }
222 }
223
224 fn select_best_plan(&self, query_key: &str, available_plans: &[QueryPlan]) -> QueryPlan {
225 if let Some(stats) = self.plan_stats.get(query_key) {
226 let mut best_plan = &available_plans[0];
228 let mut best_time = f64::INFINITY;
229
230 for plan in available_plans {
231 let plan_key = self.plan_key(plan);
232 let avg_time = self.average_time_for_plan(&plan_key, stats);
233
234 if avg_time < best_time {
235 best_time = avg_time;
236 best_plan = plan;
237 }
238 }
239
240 best_plan.clone()
241 } else {
242 available_plans[0].clone()
244 }
245 }
246
247 pub fn record_result(&mut self, query_key: &str, plan: &QueryPlan, execution_time_ms: f64, result_count: usize, success: bool) {
249 let stats = PlanStats {
250 plan: plan.clone(),
251 execution_time_ms,
252 result_count,
253 success,
254 timestamp: std::time::SystemTime::now()
255 .duration_since(std::time::UNIX_EPOCH)
256 .unwrap()
257 .as_secs(),
258 };
259
260 self.plan_stats.entry(query_key.to_string())
261 .or_insert_with(Vec::new)
262 .push(stats);
263
264 if let Some(stats_vec) = self.plan_stats.get_mut(query_key) {
266 if stats_vec.len() > 100 {
267 stats_vec.remove(0); }
269 }
270 }
271
272 fn plan_key(&self, plan: &QueryPlan) -> String {
273 match plan {
274 QueryPlan::PathFirst(path) => format!("path_first_{}", path.join("_")),
275 QueryPlan::TypeFirst(types) => format!("type_first_{}", types.join("_")),
276 QueryPlan::MeetInMiddle(split) => format!("meet_middle_{}", split),
277 QueryPlan::IndexLookup(index) => format!("index_lookup_{}", index),
278 }
279 }
280
281 fn average_time_for_plan(&self, plan_key: &str, all_stats: &[PlanStats]) -> f64 {
282 let plan_stats: Vec<_> = all_stats.iter()
283 .filter(|s| self.plan_key(&s.plan) == plan_key && s.success)
284 .collect();
285
286 if plan_stats.is_empty() {
287 return f64::INFINITY;
288 }
289
290 plan_stats.iter().map(|s| s.execution_time_ms).sum::<f64>() / plan_stats.len() as f64
291 }
292}
293
294pub struct MeetInMiddle {
298 max_split_depth: usize,
299 cost_estimator: CostEstimator,
300}
301
302impl MeetInMiddle {
303 pub fn new() -> Self {
304 Self {
305 max_split_depth: 5,
306 cost_estimator: CostEstimator::new(),
307 }
308 }
309
310 pub fn split_query(&self, query_path: &[&str], query_types: &[&str]) -> Option<QuerySplit> {
312 if query_path.len() < 3 {
313 return None; }
315
316 let path_len = query_path.len();
318 let mut best_split = 0;
319 let mut best_cost = f64::INFINITY;
320
321 for split_point in 1..path_len {
322 let left_cost = self.cost_estimator.estimate_cost(&query_path[0..split_point], &[]);
323 let right_cost = self.cost_estimator.estimate_cost(&query_path[split_point..], &[]);
324 let total_cost = left_cost + right_cost + 1.0; if total_cost < best_cost {
327 best_cost = total_cost;
328 best_split = split_point;
329 }
330 }
331
332 Some(QuerySplit {
333 left_path: query_path[0..best_split].iter().map(|s| s.to_string()).collect(),
334 right_path: query_path[best_split..].iter().map(|s| s.to_string()).collect(),
335 join_key: query_path[best_split - 1].to_string(),
336 estimated_cost: best_cost,
337 })
338 }
339}
340
341#[derive(Clone, Debug)]
343pub struct QuerySplit {
344 pub left_path: Vec<String>,
345 pub right_path: Vec<String>,
346 pub join_key: String,
347 pub estimated_cost: f64,
348}
349
350pub struct CostEstimator {
352 pub base_selectivity: f64,
354 pub path_expansion_factor: f64,
355 pub type_filter_factor: f64,
356}
357
358impl CostEstimator {
359 pub fn new() -> Self {
360 Self {
361 base_selectivity: 0.1, path_expansion_factor: 2.0, type_filter_factor: 0.5, }
365 }
366
367 pub fn estimate_cost(&self, path: &[&str], types: &[&str]) -> f64 {
368 let path_cost = path.len() as f64 * self.path_expansion_factor;
369 let type_cost = if types.is_empty() { 1.0 } else {
370 types.len() as f64 * self.type_filter_factor
371 };
372
373 path_cost * type_cost * self.base_selectivity
374 }
375}
376
377pub struct SnapshotManager {
381 snapshots: BTreeMap<u64, Cid>, access_counts: HashMap<u64, u64>, max_snapshots: usize,
384 snapshot_interval: u64, }
386
387impl SnapshotManager {
388 pub fn new(max_snapshots: usize) -> Self {
389 Self {
390 snapshots: BTreeMap::new(),
391 access_counts: HashMap::new(),
392 max_snapshots,
393 snapshot_interval: 3600, }
395 }
396
397 pub fn get_snapshot(&mut self, as_of: u64) -> Option<Cid> {
399 *self.access_counts.entry(as_of).or_insert(0) += 1;
401
402 self.snapshots.range(..=as_of)
404 .next_back()
405 .map(|(_, cid)| *cid)
406 }
407
408 pub fn create_snapshot(&mut self, as_of: u64, data_cid: Cid) {
410 while self.snapshots.len() >= self.max_snapshots {
412 if let Some(oldest_ts) = self.snapshots.iter().next().map(|(k, _)| *k) {
413 self.snapshots.remove(&oldest_ts);
414 self.access_counts.remove(&oldest_ts);
415 }
416 }
417
418 self.snapshots.insert(as_of, data_cid);
419 }
420
421 pub fn get_popular_timestamps(&self, top_k: usize) -> Vec<u64> {
423 let mut popular: Vec<(u64, u64)> = self.access_counts.iter()
424 .map(|(ts, count)| (*ts, *count))
425 .collect();
426
427 popular.sort_by(|a, b| b.1.cmp(&a.1)); popular.into_iter()
430 .take(top_k)
431 .map(|(ts, _)| ts)
432 .collect()
433 }
434}
435
436pub mod simd_varint {
440 use fcdb_core::varint;
441
442 pub fn encode_simd(values: &[u64]) -> Vec<u8> {
444 let mut result = Vec::new();
445 for &value in values {
446 varint::encode_u64(value, &mut result);
447 }
448 result
449 }
450
451 pub fn decode_simd(data: &[u8]) -> Vec<u64> {
453 let mut result = Vec::new();
454 let mut reader = data;
455 while !reader.is_empty() {
456 if let Ok(value) = varint::decode_u64(&mut reader) {
457 result.push(value);
458 } else {
459 break;
460 }
461 }
462 result
463 }
464}
465
466#[cfg(test)]
467mod tests {
468 use super::*;
469
470 #[test]
471 fn test_adaptive_bloom() {
472 let mut bloom = AdaptiveBloomSystem::new(AdaptiveBloomConfig::default());
473 let cid = Cid([1u8; 32]);
474
475 bloom.insert(&cid, 1, 100, 1234567890);
476 assert!(bloom.contains(&cid, Some(1), Some((100, 1234567890))));
477 assert!(!bloom.contains(&Cid([2u8; 32]), None, None));
478 }
479
480 #[test]
481 fn test_plan_switcher() {
482 let mut switcher = PlanSwitcher::new();
483
484 let plans = vec![
485 QueryPlan::PathFirst(vec!["user".to_string()]),
486 QueryPlan::TypeFirst(vec!["User".to_string()]),
487 ];
488
489 let selected = switcher.select_plan("test_query", &plans);
490 assert!(matches!(selected, QueryPlan::PathFirst(_) | QueryPlan::TypeFirst(_)));
492
493 switcher.record_result("test_query", &plans[0], 10.0, 100, true);
495
496 let selected2 = switcher.select_plan("test_query", &plans);
498 assert!(matches!(selected2, QueryPlan::PathFirst(_)));
499 }
500
501 #[test]
502 fn test_meet_in_middle() {
503 let mim = MeetInMiddle::new();
504 let query_path = &["user", "posts", "comments", "replies"];
505
506 if let Some(split) = mim.split_query(query_path, &[]) {
507 assert_eq!(split.left_path.len() + split.right_path.len(), query_path.len());
508 assert!(split.estimated_cost > 0.0);
509 } else {
510 panic!("Should split this query");
511 }
512 }
513
514 #[test]
515 fn test_snapshot_manager() {
516 let mut manager = SnapshotManager::new(10);
517 let cid = Cid([42u8; 32]);
518
519 manager.create_snapshot(1000, cid);
520 assert_eq!(manager.get_snapshot(1000), Some(cid));
521 assert_eq!(manager.get_snapshot(1500), Some(cid)); let popular = manager.get_popular_timestamps(5);
524 assert!(!popular.is_empty());
525 }
526}