ruvector_sona/loops/
instant.rs1use crate::lora::MicroLoRA;
6use crate::trajectory::{TrajectoryBuffer, TrajectoryIdGen};
7use crate::types::{LearningSignal, QueryTrajectory, SonaConfig};
8use parking_lot::RwLock;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, Ordering};
11
12#[derive(Clone, Debug)]
14pub struct InstantLoopConfig {
15 pub micro_lora_rank: usize,
17 pub micro_lora_lr: f32,
19 pub buffer_capacity: usize,
21 pub flush_threshold: usize,
23}
24
25impl Default for InstantLoopConfig {
26 fn default() -> Self {
27 Self {
28 micro_lora_rank: 1,
29 micro_lora_lr: 0.001,
30 buffer_capacity: 10000,
31 flush_threshold: 100,
32 }
33 }
34}
35
36impl From<&SonaConfig> for InstantLoopConfig {
37 fn from(config: &SonaConfig) -> Self {
38 Self {
39 micro_lora_rank: config.micro_lora_rank,
40 micro_lora_lr: config.micro_lora_lr,
41 buffer_capacity: config.trajectory_capacity,
42 flush_threshold: 100,
43 }
44 }
45}
46
47#[derive(Debug, Default)]
49pub struct InstantLoopMetrics {
50 pub trajectories_processed: AtomicU64,
52 pub signals_accumulated: AtomicU64,
54 pub flushes_performed: AtomicU64,
56 pub updates_applied: AtomicU64,
58}
59
60pub struct InstantLoop {
62 config: InstantLoopConfig,
64 trajectory_buffer: Arc<TrajectoryBuffer>,
66 micro_lora: Arc<RwLock<MicroLoRA>>,
68 id_gen: TrajectoryIdGen,
70 pending_signals: AtomicU64,
72 pub metrics: InstantLoopMetrics,
74}
75
76impl InstantLoop {
77 pub fn new(hidden_dim: usize, config: InstantLoopConfig) -> Self {
79 Self {
80 trajectory_buffer: Arc::new(TrajectoryBuffer::new(config.buffer_capacity)),
81 micro_lora: Arc::new(RwLock::new(MicroLoRA::new(hidden_dim, config.micro_lora_rank))),
82 id_gen: TrajectoryIdGen::new(),
83 pending_signals: AtomicU64::new(0),
84 config,
85 metrics: InstantLoopMetrics::default(),
86 }
87 }
88
89 pub fn from_sona_config(config: &SonaConfig) -> Self {
91 Self::new(config.hidden_dim, InstantLoopConfig::from(config))
92 }
93
94 pub fn next_id(&self) -> u64 {
96 self.id_gen.next()
97 }
98
99 pub fn on_trajectory(&self, trajectory: QueryTrajectory) {
101 self.trajectory_buffer.record(trajectory.clone());
103 self.metrics.trajectories_processed.fetch_add(1, Ordering::Relaxed);
104
105 let signal = LearningSignal::from_trajectory(&trajectory);
107
108 if let Some(mut lora) = self.micro_lora.try_write() {
110 lora.accumulate_gradient(&signal);
111 self.metrics.signals_accumulated.fetch_add(1, Ordering::Relaxed);
112
113 let pending = self.pending_signals.fetch_add(1, Ordering::Relaxed) + 1;
114
115 if pending >= self.config.flush_threshold as u64 {
117 self.flush_internal(&mut lora);
118 }
119 }
120 }
121
122 pub fn flush(&self) {
124 if let Some(mut lora) = self.micro_lora.try_write() {
125 self.flush_internal(&mut lora);
126 }
127 }
128
129 fn flush_internal(&self, lora: &mut MicroLoRA) {
130 let pending = lora.pending_updates();
131 if pending > 0 {
132 lora.apply_accumulated(self.config.micro_lora_lr);
133 self.pending_signals.store(0, Ordering::Relaxed);
134 self.metrics.flushes_performed.fetch_add(1, Ordering::Relaxed);
135 self.metrics.updates_applied.fetch_add(pending as u64, Ordering::Relaxed);
136 }
137 }
138
139 pub fn drain_trajectories(&self) -> Vec<QueryTrajectory> {
141 self.trajectory_buffer.drain()
142 }
143
144 pub fn drain_trajectories_n(&self, n: usize) -> Vec<QueryTrajectory> {
146 self.trajectory_buffer.drain_n(n)
147 }
148
149 pub fn micro_lora(&self) -> &Arc<RwLock<MicroLoRA>> {
151 &self.micro_lora
152 }
153
154 pub fn buffer(&self) -> &Arc<TrajectoryBuffer> {
156 &self.trajectory_buffer
157 }
158
159 pub fn pending_count(&self) -> usize {
161 self.trajectory_buffer.len()
162 }
163
164 pub fn buffer_stats(&self) -> (usize, u64, f64) {
166 (
167 self.trajectory_buffer.len(),
168 self.trajectory_buffer.dropped_count(),
169 self.trajectory_buffer.success_rate(),
170 )
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177 use crate::types::TrajectoryStep;
178
179 fn make_trajectory(id: u64) -> QueryTrajectory {
180 let mut t = QueryTrajectory::new(id, vec![0.1; 64]);
181 t.add_step(TrajectoryStep::new(vec![0.5; 64], vec![], 0.8, 0));
182 t.finalize(0.8, 1000);
183 t
184 }
185
186 #[test]
187 fn test_instant_loop_creation() {
188 let loop_a = InstantLoop::new(64, InstantLoopConfig::default());
189 assert_eq!(loop_a.pending_count(), 0);
190 }
191
192 #[test]
193 fn test_trajectory_processing() {
194 let loop_a = InstantLoop::new(64, InstantLoopConfig::default());
195
196 let t = make_trajectory(loop_a.next_id());
197 loop_a.on_trajectory(t);
198
199 assert_eq!(loop_a.pending_count(), 1);
200 assert_eq!(loop_a.metrics.trajectories_processed.load(Ordering::Relaxed), 1);
201 }
202
203 #[test]
204 fn test_auto_flush() {
205 let config = InstantLoopConfig {
206 flush_threshold: 3,
207 ..Default::default()
208 };
209 let loop_a = InstantLoop::new(64, config);
210
211 for i in 0..5 {
212 loop_a.on_trajectory(make_trajectory(i));
213 }
214
215 assert!(loop_a.metrics.flushes_performed.load(Ordering::Relaxed) >= 1);
216 }
217
218 #[test]
219 fn test_drain() {
220 let loop_a = InstantLoop::new(64, InstantLoopConfig::default());
221
222 for i in 0..10 {
223 loop_a.on_trajectory(make_trajectory(i));
224 }
225
226 let drained = loop_a.drain_trajectories();
227 assert_eq!(drained.len(), 10);
228 assert_eq!(loop_a.pending_count(), 0);
229 }
230}