1use anyhow::{anyhow, Result};
48use chrono::{DateTime, Datelike, Timelike, Utc};
49use rayon::prelude::*;
50use serde::{Deserialize, Serialize};
51use std::collections::BTreeMap;
52
53#[derive(Clone, Debug, Serialize, Deserialize)]
55pub struct TimeSeriesEntry {
56 pub id: String,
57 pub vector: Vec<f32>,
58 pub timestamp: i64, }
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub enum DecayFunction {
64 None,
66
67 Exponential { half_life: f64 },
70
71 Linear { max_age: f64 },
74
75 Gaussian { sigma: f64 },
78}
79
80impl DecayFunction {
81 pub fn apply(&self, score: f32, age_seconds: f64) -> f32 {
83 match self {
84 DecayFunction::None => score,
85
86 DecayFunction::Exponential { half_life } => {
87 let lambda = 0.693147 / half_life; score * (-lambda * age_seconds).exp() as f32
89 }
90
91 DecayFunction::Linear { max_age } => {
92 let decay = (1.0 - age_seconds / max_age).max(0.0);
93 score * decay as f32
94 }
95
96 DecayFunction::Gaussian { sigma } => {
97 let exponent = -(age_seconds.powi(2)) / (2.0 * sigma.powi(2));
98 score * exponent.exp() as f32
99 }
100 }
101 }
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106pub enum TemporalGroup {
107 HourOfDay, DayOfWeek, DayOfMonth, MonthOfYear, }
112
113#[derive(Clone)]
115pub struct TimeQuery {
116 pub vector: Vec<f32>,
118
119 pub limit: usize,
121
122 pub after: Option<i64>,
124
125 pub before: Option<i64>,
127
128 pub decay: DecayFunction,
130
131 pub reference_time: Option<i64>,
133}
134
135impl TimeQuery {
136 pub fn new(vector: Vec<f32>) -> Self {
138 Self {
139 vector,
140 limit: 10,
141 after: None,
142 before: None,
143 decay: DecayFunction::None,
144 reference_time: None,
145 }
146 }
147
148 pub fn with_limit(mut self, limit: usize) -> Self {
150 self.limit = limit;
151 self
152 }
153
154 pub fn after(mut self, timestamp: i64) -> Self {
156 self.after = Some(timestamp);
157 self
158 }
159
160 pub fn before(mut self, timestamp: i64) -> Self {
162 self.before = Some(timestamp);
163 self
164 }
165
166 pub fn with_time_decay(mut self, decay: DecayFunction) -> Self {
168 self.decay = decay;
169 self
170 }
171
172 pub fn with_reference_time(mut self, timestamp: i64) -> Self {
174 self.reference_time = Some(timestamp);
175 self
176 }
177}
178
179pub struct TimeSeriesIndex {
184 dimension: usize,
186
187 entries: BTreeMap<i64, Vec<TimeSeriesEntry>>,
189
190 num_vectors: usize,
192}
193
194impl TimeSeriesIndex {
195 pub fn new(dimension: usize) -> Result<Self> {
197 Ok(Self {
198 dimension,
199 entries: BTreeMap::new(),
200 num_vectors: 0,
201 })
202 }
203
204 pub fn add(&mut self, id: impl Into<String>, vector: Vec<f32>, timestamp: i64) -> Result<()> {
211 if vector.len() != self.dimension {
212 return Err(anyhow!(
213 "Vector dimension {} doesn't match index dimension {}",
214 vector.len(),
215 self.dimension
216 ));
217 }
218
219 let entry = TimeSeriesEntry {
220 id: id.into(),
221 vector,
222 timestamp,
223 };
224
225 self.entries
226 .entry(timestamp)
227 .or_insert_with(Vec::new)
228 .push(entry);
229
230 self.num_vectors += 1;
231
232 Ok(())
233 }
234
235 pub fn add_batch(&mut self, entries: Vec<(String, Vec<f32>, i64)>) -> Result<()> {
237 for (id, vector, timestamp) in entries {
238 self.add(id, vector, timestamp)?;
239 }
240 Ok(())
241 }
242
243 pub fn search(&self, query: &TimeQuery) -> Result<Vec<TimeSeriesResult>> {
245 if query.vector.len() != self.dimension {
246 return Err(anyhow!(
247 "Query dimension {} doesn't match index dimension {}",
248 query.vector.len(),
249 self.dimension
250 ));
251 }
252
253 let reference_time = query
254 .reference_time
255 .unwrap_or_else(|| Utc::now().timestamp());
256
257 let range_start = query.after.unwrap_or(i64::MIN);
259 let range_end = query.before.unwrap_or(i64::MAX);
260
261 let mut results: Vec<TimeSeriesResult> = self
262 .entries
263 .range(range_start..=range_end)
264 .flat_map(|(_, entries)| entries)
265 .par_bridge()
266 .map(|entry| {
267 let distance = euclidean_distance(&query.vector, &entry.vector);
269 let similarity = 1.0 / (1.0 + distance); let age_seconds = (reference_time - entry.timestamp).abs() as f64;
273 let score = query.decay.apply(similarity, age_seconds);
274
275 TimeSeriesResult {
276 id: entry.id.clone(),
277 score,
278 distance,
279 timestamp: entry.timestamp,
280 age_seconds,
281 }
282 })
283 .collect();
284
285 results.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap());
287 results.truncate(query.limit);
288
289 Ok(results)
290 }
291
292 pub fn search_windows(
302 &self,
303 query_vector: &[f32],
304 window_size: i64,
305 k: usize,
306 ) -> Result<Vec<WindowResult>> {
307 if query_vector.len() != self.dimension {
308 return Err(anyhow!("Query dimension mismatch"));
309 }
310
311 if self.entries.is_empty() {
312 return Ok(Vec::new());
313 }
314
315 let mut windows = Vec::new();
316
317 let min_time = *self.entries.keys().next().unwrap();
319 let max_time = *self.entries.keys().last().unwrap();
320
321 let mut window_start = min_time;
323 while window_start <= max_time {
324 let window_end = window_start + window_size;
325
326 let query = TimeQuery::new(query_vector.to_vec())
327 .with_limit(k)
328 .after(window_start)
329 .before(window_end);
330
331 let results = self.search(&query)?;
332
333 if !results.is_empty() {
334 windows.push(WindowResult {
335 window_start,
336 window_end,
337 results,
338 });
339 }
340
341 window_start += window_size;
342 }
343
344 Ok(windows)
345 }
346
347 pub fn group_by_pattern(&self, grouping: TemporalGroup) -> BTreeMap<i64, Vec<String>> {
351 let mut groups: BTreeMap<i64, Vec<String>> = BTreeMap::new();
352
353 for (_, entries) in &self.entries {
354 for entry in entries {
355 let dt = DateTime::from_timestamp(entry.timestamp, 0).unwrap_or_else(|| Utc::now());
356
357 let group_key = match grouping {
358 TemporalGroup::HourOfDay => dt.hour() as i64,
359 TemporalGroup::DayOfWeek => dt.weekday().num_days_from_monday() as i64,
360 TemporalGroup::DayOfMonth => dt.day() as i64,
361 TemporalGroup::MonthOfYear => dt.month() as i64,
362 };
363
364 groups
365 .entry(group_key)
366 .or_insert_with(Vec::new)
367 .push(entry.id.clone());
368 }
369 }
370
371 groups
372 }
373
374 pub fn remove(&mut self, id: &str) -> Result<bool> {
376 let mut found = false;
377
378 for (_, entries) in &mut self.entries {
379 if let Some(pos) = entries.iter().position(|e| e.id == id) {
380 entries.remove(pos);
381 found = true;
382 self.num_vectors = self.num_vectors.saturating_sub(1);
383 break;
384 }
385 }
386
387 Ok(found)
388 }
389
390 pub fn stats(&self) -> TimeSeriesStats {
392 let mut min_timestamp = i64::MAX;
393 let mut max_timestamp = i64::MIN;
394 let mut timestamps_with_data = 0;
395
396 for (×tamp, entries) in &self.entries {
397 if !entries.is_empty() {
398 min_timestamp = min_timestamp.min(timestamp);
399 max_timestamp = max_timestamp.max(timestamp);
400 timestamps_with_data += 1;
401 }
402 }
403
404 let time_span_seconds = if min_timestamp != i64::MAX {
405 (max_timestamp - min_timestamp).max(0)
406 } else {
407 0
408 };
409
410 let avg_vectors_per_timestamp = if timestamps_with_data > 0 {
411 self.num_vectors as f32 / timestamps_with_data as f32
412 } else {
413 0.0
414 };
415
416 TimeSeriesStats {
417 num_vectors: self.num_vectors,
418 num_unique_timestamps: self.entries.len(),
419 min_timestamp: if min_timestamp != i64::MAX {
420 Some(min_timestamp)
421 } else {
422 None
423 },
424 max_timestamp: if max_timestamp != i64::MIN {
425 Some(max_timestamp)
426 } else {
427 None
428 },
429 time_span_seconds,
430 avg_vectors_per_timestamp,
431 }
432 }
433
434 pub fn len(&self) -> usize {
436 self.num_vectors
437 }
438
439 pub fn is_empty(&self) -> bool {
441 self.num_vectors == 0
442 }
443
444 pub fn dimension(&self) -> usize {
446 self.dimension
447 }
448}
449
450#[derive(Debug, Clone)]
452pub struct TimeSeriesResult {
453 pub id: String,
454 pub score: f32, pub distance: f32, pub timestamp: i64, pub age_seconds: f64, }
459
460#[derive(Debug, Clone)]
462pub struct WindowResult {
463 pub window_start: i64,
464 pub window_end: i64,
465 pub results: Vec<TimeSeriesResult>,
466}
467
468#[derive(Debug, Clone)]
470pub struct TimeSeriesStats {
471 pub num_vectors: usize,
472 pub num_unique_timestamps: usize,
473 pub min_timestamp: Option<i64>,
474 pub max_timestamp: Option<i64>,
475 pub time_span_seconds: i64,
476 pub avg_vectors_per_timestamp: f32,
477}
478
479fn euclidean_distance(a: &[f32], b: &[f32]) -> f32 {
481 a.iter()
482 .zip(b.iter())
483 .map(|(x, y)| (x - y).powi(2))
484 .sum::<f32>()
485 .sqrt()
486}
487
488#[cfg(test)]
489mod tests {
490 use super::*;
491
492 fn generate_test_vectors(
493 n: usize,
494 dim: usize,
495 start_time: i64,
496 ) -> Vec<(String, Vec<f32>, i64)> {
497 (0..n)
498 .map(|i| {
499 let vector = vec![i as f32 / n as f32; dim];
500 let timestamp = start_time + (i as i64 * 3600); (format!("vec_{}", i), vector, timestamp)
502 })
503 .collect()
504 }
505
506 #[test]
507 fn test_timeseries_basic() {
508 let mut index = TimeSeriesIndex::new(64).unwrap();
509 let now = Utc::now().timestamp();
510
511 for i in 0..10 {
513 let vector = vec![i as f32 / 10.0; 64];
514 let timestamp = now - (i * 3600); index.add(format!("vec_{}", i), vector, timestamp).unwrap();
516 }
517
518 assert_eq!(index.len(), 10);
519
520 let query = TimeQuery::new(vec![0.5; 64]).with_limit(5);
522 let results = index.search(&query).unwrap();
523
524 assert_eq!(results.len(), 5);
525 }
526
527 #[test]
528 fn test_timeseries_time_range() {
529 let mut index = TimeSeriesIndex::new(32).unwrap();
530 let now = Utc::now().timestamp();
531
532 for i in 0..10 {
534 index
535 .add(format!("vec_{}", i), vec![i as f32; 32], now - (i * 3600))
536 .unwrap();
537 }
538
539 let cutoff = now - (5 * 3600);
541 let query = TimeQuery::new(vec![3.0; 32]).with_limit(10).after(cutoff);
542
543 let results = index.search(&query).unwrap();
544
545 assert!(results.len() <= 6);
548 for result in &results {
549 assert!(result.timestamp >= cutoff);
550 }
551 }
552
553 #[test]
554 fn test_decay_functions() {
555 let decay = DecayFunction::Exponential { half_life: 3600.0 };
557 let score = decay.apply(1.0, 3600.0);
558 assert!((score - 0.5).abs() < 0.01); let decay = DecayFunction::Linear { max_age: 7200.0 };
562 let score = decay.apply(1.0, 3600.0);
563 assert!((score - 0.5).abs() < 0.01); let decay = DecayFunction::None;
567 let score = decay.apply(1.0, 10000.0);
568 assert_eq!(score, 1.0);
569 }
570
571 #[test]
572 fn test_timeseries_with_decay() {
573 let mut index = TimeSeriesIndex::new(64).unwrap();
574 let now = Utc::now().timestamp();
575
576 index.add("recent", vec![0.5; 64], now).unwrap();
578 index.add("old", vec![0.5; 64], now - 7200).unwrap();
579
580 let query = TimeQuery::new(vec![0.5; 64])
582 .with_limit(10)
583 .with_time_decay(DecayFunction::Exponential { half_life: 3600.0 })
584 .with_reference_time(now);
585
586 let results = index.search(&query).unwrap();
587
588 assert_eq!(results[0].id, "recent");
590 assert!(results[0].score > results[1].score);
591 }
592
593 #[test]
594 fn test_batch_add() {
595 let mut index = TimeSeriesIndex::new(32).unwrap();
596 let now = Utc::now().timestamp();
597
598 let batch = generate_test_vectors(20, 32, now);
599 index.add_batch(batch).unwrap();
600
601 assert_eq!(index.len(), 20);
602 }
603
604 #[test]
605 fn test_remove() {
606 let mut index = TimeSeriesIndex::new(32).unwrap();
607 let now = Utc::now().timestamp();
608
609 index.add("vec_1", vec![0.1; 32], now).unwrap();
610 index.add("vec_2", vec![0.2; 32], now + 100).unwrap();
611
612 assert_eq!(index.len(), 2);
613
614 let removed = index.remove("vec_1").unwrap();
615 assert!(removed);
616 assert_eq!(index.len(), 1);
617
618 let removed = index.remove("vec_1").unwrap();
619 assert!(!removed);
620 }
621
622 #[test]
623 fn test_stats() {
624 let mut index = TimeSeriesIndex::new(64).unwrap();
625 let now = Utc::now().timestamp();
626
627 for i in 0..10 {
628 index
629 .add(format!("vec_{}", i), vec![i as f32; 64], now + (i * 1000))
630 .unwrap();
631 }
632
633 let stats = index.stats();
634
635 assert_eq!(stats.num_vectors, 10);
636 assert_eq!(stats.num_unique_timestamps, 10);
637 assert_eq!(stats.min_timestamp, Some(now));
638 assert_eq!(stats.max_timestamp, Some(now + 9000));
639 assert_eq!(stats.time_span_seconds, 9000);
640 }
641
642 #[test]
643 fn test_window_search() {
644 let mut index = TimeSeriesIndex::new(32).unwrap();
645 let now = Utc::now().timestamp();
646
647 for i in 0..10 {
649 index
650 .add(format!("vec_{}", i), vec![i as f32; 32], now + (i * 3600))
651 .unwrap();
652 }
653
654 let query_vector = vec![5.0; 32];
656 let windows = index.search_windows(&query_vector, 7200, 5).unwrap();
657
658 assert!(!windows.is_empty());
659
660 for window in &windows {
662 assert_eq!(window.window_end - window.window_start, 7200);
663 }
664 }
665
666 #[test]
667 fn test_temporal_grouping() {
668 let mut index = TimeSeriesIndex::new(32).unwrap();
669
670 let base_time = DateTime::parse_from_rfc3339("2024-01-15T10:00:00Z")
672 .unwrap()
673 .timestamp();
674
675 for i in 0..24 {
676 index
677 .add(
678 format!("vec_{}", i),
679 vec![i as f32; 32],
680 base_time + (i * 3600), )
682 .unwrap();
683 }
684
685 let groups = index.group_by_pattern(TemporalGroup::HourOfDay);
687
688 assert_eq!(groups.len(), 24);
690 }
691}