Skip to main content

ruvector_dag/sona/
trajectory.rs

1//! Trajectory Buffer: Lock-free buffer for learning trajectories
2
3use crossbeam::queue::ArrayQueue;
4use std::sync::atomic::{AtomicUsize, Ordering};
5
6/// A single learning trajectory
7#[derive(Debug, Clone)]
8pub struct DagTrajectory {
9    pub query_hash: u64,
10    pub dag_embedding: Vec<f32>,
11    pub attention_mechanism: String,
12    pub execution_time_ms: f64,
13    pub improvement_ratio: f32,
14    pub timestamp: std::time::Instant,
15}
16
17impl DagTrajectory {
18    pub fn new(
19        query_hash: u64,
20        dag_embedding: Vec<f32>,
21        attention_mechanism: String,
22        execution_time_ms: f64,
23        baseline_time_ms: f64,
24    ) -> Self {
25        let improvement_ratio = if baseline_time_ms > 0.0 {
26            (baseline_time_ms - execution_time_ms) as f32 / baseline_time_ms as f32
27        } else {
28            0.0
29        };
30
31        Self {
32            query_hash,
33            dag_embedding,
34            attention_mechanism,
35            execution_time_ms,
36            improvement_ratio,
37            timestamp: std::time::Instant::now(),
38        }
39    }
40
41    /// Compute quality score (0-1)
42    pub fn quality(&self) -> f32 {
43        // Quality based on improvement and execution time
44        let time_score = 1.0 / (1.0 + self.execution_time_ms as f32 / 1000.0);
45        let improvement_score = (self.improvement_ratio + 1.0) / 2.0;
46        0.5 * time_score + 0.5 * improvement_score
47    }
48}
49
50/// Lock-free trajectory buffer
51pub struct DagTrajectoryBuffer {
52    queue: ArrayQueue<DagTrajectory>,
53    count: AtomicUsize,
54    #[allow(dead_code)]
55    capacity: usize,
56}
57
58impl DagTrajectoryBuffer {
59    pub fn new(capacity: usize) -> Self {
60        Self {
61            queue: ArrayQueue::new(capacity),
62            count: AtomicUsize::new(0),
63            capacity,
64        }
65    }
66
67    /// Push trajectory, dropping oldest if full
68    pub fn push(&self, trajectory: DagTrajectory) {
69        if self.queue.push(trajectory.clone()).is_err() {
70            // Queue full, pop oldest and retry
71            let _ = self.queue.pop();
72            let _ = self.queue.push(trajectory);
73        }
74        self.count.fetch_add(1, Ordering::Relaxed);
75    }
76
77    /// Drain all trajectories for processing
78    pub fn drain(&self) -> Vec<DagTrajectory> {
79        let mut result = Vec::with_capacity(self.queue.len());
80        while let Some(t) = self.queue.pop() {
81            result.push(t);
82        }
83        result
84    }
85
86    pub fn len(&self) -> usize {
87        self.queue.len()
88    }
89
90    pub fn is_empty(&self) -> bool {
91        self.queue.is_empty()
92    }
93
94    pub fn total_count(&self) -> usize {
95        self.count.load(Ordering::Relaxed)
96    }
97}