1use std::sync::Arc;
14
15use ahash::AHashSet;
16use async_trait::async_trait;
17use feagi_npu_burst_engine::BurstLoopRunner;
18use feagi_structures::genomic::cortical_area::CorticalID;
19use parking_lot::RwLock;
20use tracing::{debug, info, warn};
21
22use crate::traits::runtime_service::ManualStimulationMode;
23use crate::traits::RuntimeService;
24use crate::types::{RuntimeStatus, ServiceError, ServiceResult};
25
26pub struct RuntimeServiceImpl {
30 burst_runner: Arc<RwLock<BurstLoopRunner>>,
31 paused: Arc<RwLock<bool>>,
32}
33
34impl RuntimeServiceImpl {
35 pub fn new(burst_runner: Arc<RwLock<BurstLoopRunner>>) -> Self {
37 Self {
38 burst_runner,
39 paused: Arc::new(RwLock::new(false)),
40 }
41 }
42}
43
44#[async_trait]
45impl RuntimeService for RuntimeServiceImpl {
46 async fn start(&self) -> ServiceResult<()> {
47 info!(target: "feagi-services", "Starting burst engine");
48
49 let mut runner = self.burst_runner.write();
50
51 runner
52 .start()
53 .map_err(|e| ServiceError::InvalidState(e.to_string()))?;
54
55 *self.paused.write() = false;
57
58 Ok(())
59 }
60
61 async fn stop(&self) -> ServiceResult<()> {
62 info!(target: "feagi-services", "Stopping burst engine");
63
64 let mut runner = self.burst_runner.write();
65 runner
66 .stop_strict()
67 .map_err(|e| ServiceError::Backend(format!("Failed to stop burst engine: {}", e)))?;
68
69 *self.paused.write() = false;
71
72 Ok(())
73 }
74
75 async fn pause(&self) -> ServiceResult<()> {
76 info!(target: "feagi-services", "Pausing burst engine");
77
78 let runner = self.burst_runner.read();
79 if !runner.is_running() {
80 return Err(ServiceError::InvalidState(
81 "Burst engine is not running".to_string(),
82 ));
83 }
84
85 *self.paused.write() = true;
87
88 warn!(target: "feagi-services", "Pause not yet implemented in BurstLoopRunner - using flag only");
91
92 Ok(())
93 }
94
95 async fn resume(&self) -> ServiceResult<()> {
96 info!(target: "feagi-services", "Resuming burst engine");
97
98 let paused = *self.paused.read();
99 if !paused {
100 return Err(ServiceError::InvalidState(
101 "Burst engine is not paused".to_string(),
102 ));
103 }
104
105 *self.paused.write() = false;
107
108 warn!(target: "feagi-services", "Resume not yet implemented in BurstLoopRunner - using flag only");
110
111 Ok(())
112 }
113
114 async fn step(&self) -> ServiceResult<()> {
115 info!(target: "feagi-services", "Executing single burst step");
116
117 let runner = self.burst_runner.read();
118 if runner.is_running() {
119 return Err(ServiceError::InvalidState(
120 "Cannot step while burst engine is running in continuous mode".to_string(),
121 ));
122 }
123
124 warn!(target: "feagi-services", "Single-step execution not yet implemented in BurstLoopRunner");
126
127 Err(ServiceError::NotImplemented(
128 "Single-step execution not yet implemented".to_string(),
129 ))
130 }
131
132 async fn get_status(&self) -> ServiceResult<RuntimeStatus> {
133 let runner = self.burst_runner.read();
134 let is_running = runner.is_running();
135 let burst_count = runner.get_burst_count();
136 let is_paused = *self.paused.read();
137
138 Ok(RuntimeStatus {
143 is_running,
144 is_paused,
145 frequency_hz: runner.get_frequency(),
146 burst_count,
147 current_rate_hz: if is_running {
148 runner.get_frequency()
149 } else {
150 0.0
151 },
152 last_burst_neuron_count: 0, avg_burst_time_ms: 0.0, })
155 }
156
157 async fn set_frequency(&self, frequency_hz: f64) -> ServiceResult<()> {
158 if frequency_hz <= 0.0 {
159 return Err(ServiceError::InvalidInput(
160 "Frequency must be greater than 0".to_string(),
161 ));
162 }
163
164 info!(target: "feagi-services", "Setting burst frequency to {} Hz", frequency_hz);
165
166 let mut runner = self.burst_runner.write();
167 runner.set_frequency(frequency_hz);
168
169 Ok(())
170 }
171
172 async fn get_burst_count(&self) -> ServiceResult<u64> {
173 let runner = self.burst_runner.read();
174 Ok(runner.get_burst_count())
175 }
176
177 async fn reset_burst_count(&self) -> ServiceResult<()> {
178 info!(target: "feagi-services", "Resetting burst count");
179
180 warn!(target: "feagi-services", "Burst count reset not yet implemented in BurstLoopRunner");
182
183 Err(ServiceError::NotImplemented(
184 "Burst count reset not yet implemented".to_string(),
185 ))
186 }
187
188 async fn register_motor_subscriptions(
189 &self,
190 agent_id: &str,
191 cortical_ids: Vec<String>,
192 rate_hz: f64,
193 ) -> ServiceResult<()> {
194 if rate_hz <= 0.0 {
195 return Err(ServiceError::InvalidInput(
196 "Motor rate must be greater than 0".to_string(),
197 ));
198 }
199
200 let cortical_set: AHashSet<String> = cortical_ids.into_iter().collect();
201 let runner = self.burst_runner.read();
202 runner
203 .register_motor_subscriptions_with_rate(agent_id.to_string(), cortical_set, rate_hz)
204 .map_err(|e| ServiceError::InvalidInput(e.to_string()))
205 }
206
207 async fn register_visualization_subscriptions(
208 &self,
209 agent_id: &str,
210 rate_hz: f64,
211 ) -> ServiceResult<()> {
212 if rate_hz <= 0.0 {
213 return Err(ServiceError::InvalidInput(
214 "Visualization rate must be greater than 0".to_string(),
215 ));
216 }
217
218 let runner = self.burst_runner.read();
219 runner
220 .register_visualization_subscriptions_with_rate(agent_id.to_string(), rate_hz)
221 .map_err(|e| ServiceError::InvalidInput(e.to_string()))
222 }
223
224 fn unregister_motor_subscriptions(&self, agent_id: &str) {
225 let runner = self.burst_runner.read();
226 runner.unregister_motor_subscriptions(agent_id);
227 }
228
229 fn unregister_visualization_subscriptions(&self, agent_id: &str) {
230 let runner = self.burst_runner.read();
231 runner.unregister_visualization_subscriptions(agent_id);
232 }
233
234 fn clear_all_motor_subscriptions(&self) {
235 let runner = self.burst_runner.read();
236 runner.clear_all_motor_subscriptions();
237 }
238
239 fn clear_all_visualization_subscriptions(&self) {
240 let runner = self.burst_runner.read();
241 runner.clear_all_visualization_subscriptions();
242 }
243
244 async fn get_fcl_snapshot(&self) -> ServiceResult<Vec<(u64, f32)>> {
245 let runner = self.burst_runner.read();
246 let fcl_data = runner.get_fcl_snapshot();
247
248 let result = fcl_data
250 .iter()
251 .map(|(neuron_id, potential)| (neuron_id.0 as u64, *potential))
252 .collect();
253
254 Ok(result)
255 }
256
257 async fn get_fcl_snapshot_with_cortical_idx(&self) -> ServiceResult<Vec<(u64, u32, f32)>> {
258 let runner = self.burst_runner.read();
259 let npu = runner.get_npu();
260
261 let lock_start = std::time::Instant::now();
264 let thread_id = std::thread::current().id();
265 debug!("[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} attempting NPU lock for get_fcl_snapshot_with_cortical_idx at {:?}", thread_id, lock_start);
266 let result: Vec<(u64, u32, f32)> = {
267 let npu_lock = npu.lock().unwrap();
269 let lock_acquired = std::time::Instant::now();
270 let lock_wait = lock_acquired.duration_since(lock_start);
271 debug!(
272 "[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} acquired lock after {:.2}ms wait for get_fcl_snapshot_with_cortical_idx",
273 thread_id,
274 lock_wait.as_secs_f64() * 1000.0
275 );
276
277 let fcl_data = npu_lock
279 .get_last_fcl_snapshot_with_cortical_idx()
280 .map_err(|e| {
281 ServiceError::Internal(format!("Failed to resolve FCL cortical_idx: {e}"))
282 })?;
283 debug!(
284 "[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} got FCL snapshot ({} neurons) with cortical_idx",
285 thread_id,
286 fcl_data.len()
287 );
288
289 fcl_data
290 .into_iter()
291 .map(|(neuron_id, cortical_idx, potential)| {
292 (neuron_id.0 as u64, cortical_idx, potential)
293 })
294 .collect()
295 }; let lock_released = std::time::Instant::now();
297 let _lock_hold_duration = lock_released.duration_since(lock_start);
298 debug!("[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} RELEASED NPU lock after get_fcl_snapshot_with_cortical_idx (total: {:.2}ms wait + {:.2}ms hold, {} neurons)",
299 thread_id,
300 lock_released.duration_since(lock_start).as_secs_f64() * 1000.0,
301 lock_released.duration_since(lock_start).as_secs_f64() * 1000.0,
302 result.len());
303
304 Ok(result)
305 }
306
307 async fn get_fire_queue_sample(
308 &self,
309 ) -> ServiceResult<
310 std::collections::HashMap<u32, (Vec<u32>, Vec<u32>, Vec<u32>, Vec<u32>, Vec<f32>)>,
311 > {
312 let mut runner = self.burst_runner.write();
313
314 match runner.get_fire_queue_sample() {
315 Some(sample) => {
316 let result: std::collections::HashMap<_, _> = sample.into_iter().collect();
318 Ok(result)
319 }
320 None => Ok(std::collections::HashMap::new()),
321 }
322 }
323
324 async fn get_fire_ledger_configs(&self) -> ServiceResult<Vec<(u32, usize)>> {
325 debug!("[NPU-LOCK] RUNTIME-SERVICE: get_fire_ledger_configs() called - this acquires NPU lock!");
326 let runner = self.burst_runner.read();
327 let configs = runner.get_fire_ledger_configs();
328 Ok(configs)
329 }
330
331 async fn configure_fire_ledger_window(
332 &self,
333 cortical_idx: u32,
334 window_size: usize,
335 ) -> ServiceResult<()> {
336 let mut runner = self.burst_runner.write();
337 runner
338 .configure_fire_ledger_window(cortical_idx, window_size)
339 .map_err(|e| {
340 ServiceError::Internal(format!("Failed to configure fire ledger window: {e}"))
341 })?;
342
343 info!(target: "feagi-services", "Configured Fire Ledger window for area {}: {} bursts",
344 cortical_idx, window_size);
345
346 Ok(())
347 }
348
349 async fn get_fcl_sampler_config(&self) -> ServiceResult<(f64, u32)> {
350 let runner = self.burst_runner.read();
351 Ok(runner.get_fcl_sampler_config())
352 }
353
354 async fn set_fcl_sampler_config(
355 &self,
356 frequency: Option<f64>,
357 consumer: Option<u32>,
358 ) -> ServiceResult<()> {
359 let runner = self.burst_runner.read();
360 runner.set_fcl_sampler_config(frequency, consumer);
361 Ok(())
362 }
363
364 async fn get_area_fcl_sample_rate(&self, area_id: u32) -> ServiceResult<f64> {
365 let runner = self.burst_runner.read();
366 Ok(runner.get_area_fcl_sample_rate(area_id))
367 }
368
369 async fn set_area_fcl_sample_rate(&self, area_id: u32, sample_rate: f64) -> ServiceResult<()> {
370 if sample_rate <= 0.0 || sample_rate > 1000.0 {
371 return Err(ServiceError::InvalidInput(
372 "Sample rate must be between 0 and 1000 Hz".to_string(),
373 ));
374 }
375
376 let runner = self.burst_runner.read();
377 runner.set_area_fcl_sample_rate(area_id, sample_rate);
378
379 info!(target: "feagi-services", "Set FCL sample rate for area {} to {}Hz", area_id, sample_rate);
380 Ok(())
381 }
382
383 async fn inject_sensory_by_coordinates(
384 &self,
385 cortical_id: &str,
386 xyzp_data: &[(u32, u32, u32, f32)],
387 mode: ManualStimulationMode,
388 ) -> ServiceResult<usize> {
389 let cortical_id_typed = CorticalID::try_from_base_64(cortical_id).map_err(|e| {
391 ServiceError::InvalidInput(format!("Invalid cortical ID format: {}", e))
392 })?;
393
394 let runner = self.burst_runner.read();
396 let npu = runner.get_npu();
397
398 let lock_start = std::time::Instant::now();
400 debug!(
401 "[NPU-LOCK] RUNTIME-SERVICE: Acquiring lock for manual stimulation ({} coordinates) - THIS CAN BLOCK BURST LOOP!",
402 xyzp_data.len()
403 );
404 let injected_count = {
405 let mut npu_lock = npu
406 .lock()
407 .map_err(|e| ServiceError::Backend(format!("Failed to lock NPU: {}", e)))?;
408 let lock_wait = lock_start.elapsed();
409 debug!(
410 "[NPU-LOCK] RUNTIME-SERVICE: Lock acquired for manual stimulation (waited {:.2}ms)",
411 lock_wait.as_secs_f64() * 1000.0
412 );
413 let result = match mode {
414 ManualStimulationMode::Candidate => {
415 npu_lock.inject_sensory_xyzp_by_id(&cortical_id_typed, xyzp_data)
416 }
417 ManualStimulationMode::ForceFire => {
418 npu_lock.inject_force_fire_by_coordinates(&cortical_id_typed, xyzp_data)
419 }
420 };
421 let lock_hold_duration = lock_start.elapsed();
422 debug!(
423 "[NPU-LOCK] RUNTIME-SERVICE: Releasing lock after manual stimulation (held for {:.2}ms)",
424 lock_hold_duration.as_secs_f64() * 1000.0
425 );
426 result
427 };
428
429 if injected_count == 0 && !xyzp_data.is_empty() {
430 warn!(target: "feagi-services",
431 "No neurons found for injection: cortical_id={}, coordinates={}",
432 cortical_id, xyzp_data.len());
433 }
434
435 Ok(injected_count)
436 }
437}