ruvector_sona/loops/
instant.rs

1//! Loop A - Instant Learning
2//!
3//! Per-request adaptation with <1ms overhead.
4
5use crate::lora::MicroLoRA;
6use crate::trajectory::{TrajectoryBuffer, TrajectoryIdGen};
7use crate::types::{LearningSignal, QueryTrajectory, SonaConfig};
8use parking_lot::RwLock;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, Ordering};
11
12/// Configuration for instant loop
13#[derive(Clone, Debug)]
14pub struct InstantLoopConfig {
15    /// Micro-LoRA rank
16    pub micro_lora_rank: usize,
17    /// Micro-LoRA learning rate
18    pub micro_lora_lr: f32,
19    /// Buffer capacity
20    pub buffer_capacity: usize,
21    /// Flush threshold (apply updates every N signals)
22    pub flush_threshold: usize,
23}
24
25impl Default for InstantLoopConfig {
26    fn default() -> Self {
27        Self {
28            micro_lora_rank: 1,
29            micro_lora_lr: 0.001,
30            buffer_capacity: 10000,
31            flush_threshold: 100,
32        }
33    }
34}
35
36impl From<&SonaConfig> for InstantLoopConfig {
37    fn from(config: &SonaConfig) -> Self {
38        Self {
39            micro_lora_rank: config.micro_lora_rank,
40            micro_lora_lr: config.micro_lora_lr,
41            buffer_capacity: config.trajectory_capacity,
42            flush_threshold: 100,
43        }
44    }
45}
46
47/// Instant loop metrics
48#[derive(Debug, Default)]
49pub struct InstantLoopMetrics {
50    /// Total trajectories processed
51    pub trajectories_processed: AtomicU64,
52    /// Total signals accumulated
53    pub signals_accumulated: AtomicU64,
54    /// Total flushes performed
55    pub flushes_performed: AtomicU64,
56    /// Total updates applied
57    pub updates_applied: AtomicU64,
58}
59
60/// Instant learning loop (Loop A)
61pub struct InstantLoop {
62    /// Configuration
63    config: InstantLoopConfig,
64    /// Trajectory buffer
65    trajectory_buffer: Arc<TrajectoryBuffer>,
66    /// Micro-LoRA adapter
67    micro_lora: Arc<RwLock<MicroLoRA>>,
68    /// ID generator
69    id_gen: TrajectoryIdGen,
70    /// Pending signal count
71    pending_signals: AtomicU64,
72    /// Metrics
73    pub metrics: InstantLoopMetrics,
74}
75
76impl InstantLoop {
77    /// Create new instant loop
78    pub fn new(hidden_dim: usize, config: InstantLoopConfig) -> Self {
79        Self {
80            trajectory_buffer: Arc::new(TrajectoryBuffer::new(config.buffer_capacity)),
81            micro_lora: Arc::new(RwLock::new(MicroLoRA::new(hidden_dim, config.micro_lora_rank))),
82            id_gen: TrajectoryIdGen::new(),
83            pending_signals: AtomicU64::new(0),
84            config,
85            metrics: InstantLoopMetrics::default(),
86        }
87    }
88
89    /// Create from SONA config
90    pub fn from_sona_config(config: &SonaConfig) -> Self {
91        Self::new(config.hidden_dim, InstantLoopConfig::from(config))
92    }
93
94    /// Generate next trajectory ID
95    pub fn next_id(&self) -> u64 {
96        self.id_gen.next()
97    }
98
99    /// Process completed trajectory
100    pub fn on_trajectory(&self, trajectory: QueryTrajectory) {
101        // Record to buffer
102        self.trajectory_buffer.record(trajectory.clone());
103        self.metrics.trajectories_processed.fetch_add(1, Ordering::Relaxed);
104
105        // Generate learning signal
106        let signal = LearningSignal::from_trajectory(&trajectory);
107
108        // Accumulate gradient (non-blocking)
109        if let Some(mut lora) = self.micro_lora.try_write() {
110            lora.accumulate_gradient(&signal);
111            self.metrics.signals_accumulated.fetch_add(1, Ordering::Relaxed);
112
113            let pending = self.pending_signals.fetch_add(1, Ordering::Relaxed) + 1;
114
115            // Auto-flush if threshold reached
116            if pending >= self.config.flush_threshold as u64 {
117                self.flush_internal(&mut lora);
118            }
119        }
120    }
121
122    /// Manually flush accumulated updates
123    pub fn flush(&self) {
124        if let Some(mut lora) = self.micro_lora.try_write() {
125            self.flush_internal(&mut lora);
126        }
127    }
128
129    fn flush_internal(&self, lora: &mut MicroLoRA) {
130        let pending = lora.pending_updates();
131        if pending > 0 {
132            lora.apply_accumulated(self.config.micro_lora_lr);
133            self.pending_signals.store(0, Ordering::Relaxed);
134            self.metrics.flushes_performed.fetch_add(1, Ordering::Relaxed);
135            self.metrics.updates_applied.fetch_add(pending as u64, Ordering::Relaxed);
136        }
137    }
138
139    /// Drain trajectories for background processing
140    pub fn drain_trajectories(&self) -> Vec<QueryTrajectory> {
141        self.trajectory_buffer.drain()
142    }
143
144    /// Drain up to N trajectories
145    pub fn drain_trajectories_n(&self, n: usize) -> Vec<QueryTrajectory> {
146        self.trajectory_buffer.drain_n(n)
147    }
148
149    /// Get micro-LoRA reference for inference
150    pub fn micro_lora(&self) -> &Arc<RwLock<MicroLoRA>> {
151        &self.micro_lora
152    }
153
154    /// Get trajectory buffer reference
155    pub fn buffer(&self) -> &Arc<TrajectoryBuffer> {
156        &self.trajectory_buffer
157    }
158
159    /// Get pending trajectory count
160    pub fn pending_count(&self) -> usize {
161        self.trajectory_buffer.len()
162    }
163
164    /// Get buffer stats
165    pub fn buffer_stats(&self) -> (usize, u64, f64) {
166        (
167            self.trajectory_buffer.len(),
168            self.trajectory_buffer.dropped_count(),
169            self.trajectory_buffer.success_rate(),
170        )
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177    use crate::types::TrajectoryStep;
178
179    fn make_trajectory(id: u64) -> QueryTrajectory {
180        let mut t = QueryTrajectory::new(id, vec![0.1; 64]);
181        t.add_step(TrajectoryStep::new(vec![0.5; 64], vec![], 0.8, 0));
182        t.finalize(0.8, 1000);
183        t
184    }
185
186    #[test]
187    fn test_instant_loop_creation() {
188        let loop_a = InstantLoop::new(64, InstantLoopConfig::default());
189        assert_eq!(loop_a.pending_count(), 0);
190    }
191
192    #[test]
193    fn test_trajectory_processing() {
194        let loop_a = InstantLoop::new(64, InstantLoopConfig::default());
195
196        let t = make_trajectory(loop_a.next_id());
197        loop_a.on_trajectory(t);
198
199        assert_eq!(loop_a.pending_count(), 1);
200        assert_eq!(loop_a.metrics.trajectories_processed.load(Ordering::Relaxed), 1);
201    }
202
203    #[test]
204    fn test_auto_flush() {
205        let config = InstantLoopConfig {
206            flush_threshold: 3,
207            ..Default::default()
208        };
209        let loop_a = InstantLoop::new(64, config);
210
211        for i in 0..5 {
212            loop_a.on_trajectory(make_trajectory(i));
213        }
214
215        assert!(loop_a.metrics.flushes_performed.load(Ordering::Relaxed) >= 1);
216    }
217
218    #[test]
219    fn test_drain() {
220        let loop_a = InstantLoop::new(64, InstantLoopConfig::default());
221
222        for i in 0..10 {
223            loop_a.on_trajectory(make_trajectory(i));
224        }
225
226        let drained = loop_a.drain_trajectories();
227        assert_eq!(drained.len(), 10);
228        assert_eq!(loop_a.pending_count(), 0);
229    }
230}