1use std::sync::Arc;
14
15use ahash::AHashSet;
16use async_trait::async_trait;
17use feagi_npu_burst_engine::BurstLoopRunner;
18use feagi_structures::genomic::cortical_area::CorticalID;
19use parking_lot::RwLock;
20use tracing::{debug, info, warn};
21
22use crate::traits::runtime_service::ManualStimulationMode;
23use crate::traits::RuntimeService;
24use crate::types::{RuntimeStatus, ServiceError, ServiceResult};
25
26pub struct RuntimeServiceImpl {
30 burst_runner: Arc<RwLock<BurstLoopRunner>>,
31 paused: Arc<RwLock<bool>>,
32 plasticity_service: Option<Arc<dyn std::any::Any + Send + Sync>>,
33}
34
35impl RuntimeServiceImpl {
36 pub fn new(burst_runner: Arc<RwLock<BurstLoopRunner>>) -> Self {
38 Self {
39 burst_runner,
40 paused: Arc::new(RwLock::new(false)),
41 plasticity_service: None,
42 }
43 }
44
45 #[cfg(feature = "plasticity")]
47 pub fn new_with_plasticity(
48 burst_runner: Arc<RwLock<BurstLoopRunner>>,
49 plasticity_service: Arc<feagi_npu_plasticity::PlasticityService>,
50 ) -> Self {
51 Self {
52 burst_runner,
53 paused: Arc::new(RwLock::new(false)),
54 plasticity_service: Some(plasticity_service as Arc<dyn std::any::Any + Send + Sync>),
55 }
56 }
57
58 #[cfg(not(feature = "plasticity"))]
60 pub fn new_with_plasticity(
61 burst_runner: Arc<RwLock<BurstLoopRunner>>,
62 _plasticity_service: Arc<dyn std::any::Any + Send + Sync>,
63 ) -> Self {
64 Self {
65 burst_runner,
66 paused: Arc::new(RwLock::new(false)),
67 plasticity_service: None,
68 }
69 }
70}
71
72#[async_trait]
73impl RuntimeService for RuntimeServiceImpl {
74 async fn start(&self) -> ServiceResult<()> {
75 info!(target: "feagi-services", "Starting burst engine");
76
77 let mut runner = self.burst_runner.write();
78
79 runner
80 .start()
81 .map_err(|e| ServiceError::InvalidState(e.to_string()))?;
82
83 *self.paused.write() = false;
85
86 Ok(())
87 }
88
89 async fn stop(&self) -> ServiceResult<()> {
90 info!(target: "feagi-services", "Stopping burst engine");
91
92 let mut runner = self.burst_runner.write();
93 runner
94 .stop_strict()
95 .map_err(|e| ServiceError::Backend(format!("Failed to stop burst engine: {}", e)))?;
96
97 *self.paused.write() = false;
99
100 Ok(())
101 }
102
103 async fn pause(&self) -> ServiceResult<()> {
104 info!(target: "feagi-services", "Pausing burst engine");
105
106 let runner = self.burst_runner.read();
107 if !runner.is_running() {
108 return Err(ServiceError::InvalidState(
109 "Burst engine is not running".to_string(),
110 ));
111 }
112
113 *self.paused.write() = true;
115
116 warn!(target: "feagi-services", "Pause not yet implemented in BurstLoopRunner - using flag only");
119
120 Ok(())
121 }
122
123 async fn resume(&self) -> ServiceResult<()> {
124 info!(target: "feagi-services", "Resuming burst engine");
125
126 let paused = *self.paused.read();
127 if !paused {
128 return Err(ServiceError::InvalidState(
129 "Burst engine is not paused".to_string(),
130 ));
131 }
132
133 *self.paused.write() = false;
135
136 warn!(target: "feagi-services", "Resume not yet implemented in BurstLoopRunner - using flag only");
138
139 Ok(())
140 }
141
142 async fn step(&self) -> ServiceResult<()> {
143 info!(target: "feagi-services", "Executing single burst step");
144
145 let runner = self.burst_runner.read();
146 if runner.is_running() {
147 return Err(ServiceError::InvalidState(
148 "Cannot step while burst engine is running in continuous mode".to_string(),
149 ));
150 }
151
152 warn!(target: "feagi-services", "Single-step execution not yet implemented in BurstLoopRunner");
154
155 Err(ServiceError::NotImplemented(
156 "Single-step execution not yet implemented".to_string(),
157 ))
158 }
159
160 async fn get_status(&self) -> ServiceResult<RuntimeStatus> {
161 let runner = self.burst_runner.read();
162 let is_running = runner.is_running();
163 let burst_count = runner.get_burst_count();
164 let is_paused = *self.paused.read();
165
166 Ok(RuntimeStatus {
171 is_running,
172 is_paused,
173 frequency_hz: runner.get_frequency(),
174 burst_count,
175 current_rate_hz: if is_running {
176 runner.get_frequency()
177 } else {
178 0.0
179 },
180 last_burst_neuron_count: 0, avg_burst_time_ms: 0.0, })
183 }
184
185 async fn set_frequency(&self, frequency_hz: f64) -> ServiceResult<()> {
186 if frequency_hz <= 0.0 {
187 return Err(ServiceError::InvalidInput(
188 "Frequency must be greater than 0".to_string(),
189 ));
190 }
191
192 info!(target: "feagi-services", "Setting burst frequency to {} Hz", frequency_hz);
193
194 let mut runner = self.burst_runner.write();
195 runner.set_frequency(frequency_hz);
196
197 Ok(())
198 }
199
200 async fn get_burst_count(&self) -> ServiceResult<u64> {
201 let runner = self.burst_runner.read();
202 Ok(runner.get_burst_count())
203 }
204
205 async fn reset_burst_count(&self) -> ServiceResult<()> {
206 info!(target: "feagi-services", "Resetting burst count");
207
208 warn!(target: "feagi-services", "Burst count reset not yet implemented in BurstLoopRunner");
210
211 Err(ServiceError::NotImplemented(
212 "Burst count reset not yet implemented".to_string(),
213 ))
214 }
215
216 async fn register_motor_subscriptions(
217 &self,
218 agent_id: &str,
219 cortical_ids: Vec<String>,
220 rate_hz: f64,
221 ) -> ServiceResult<()> {
222 if rate_hz <= 0.0 {
223 return Err(ServiceError::InvalidInput(
224 "Motor rate must be greater than 0".to_string(),
225 ));
226 }
227
228 let cortical_set: AHashSet<String> = cortical_ids.into_iter().collect();
229 let runner = self.burst_runner.read();
230 runner
231 .register_motor_subscriptions_with_rate(agent_id.to_string(), cortical_set, rate_hz)
232 .map_err(|e| ServiceError::InvalidInput(e.to_string()))
233 }
234
235 async fn register_visualization_subscriptions(
236 &self,
237 agent_id: &str,
238 rate_hz: f64,
239 ) -> ServiceResult<()> {
240 if rate_hz <= 0.0 {
241 return Err(ServiceError::InvalidInput(
242 "Visualization rate must be greater than 0".to_string(),
243 ));
244 }
245
246 let runner = self.burst_runner.read();
247 runner
248 .register_visualization_subscriptions_with_rate(agent_id.to_string(), rate_hz)
249 .map_err(|e| ServiceError::InvalidInput(e.to_string()))
250 }
251
252 fn unregister_motor_subscriptions(&self, agent_id: &str) {
253 let runner = self.burst_runner.read();
254 runner.unregister_motor_subscriptions(agent_id);
255 }
256
257 fn unregister_visualization_subscriptions(&self, agent_id: &str) {
258 let runner = self.burst_runner.read();
259 runner.unregister_visualization_subscriptions(agent_id);
260 }
261
262 fn clear_all_motor_subscriptions(&self) {
263 let runner = self.burst_runner.read();
264 runner.clear_all_motor_subscriptions();
265 }
266
267 fn clear_all_visualization_subscriptions(&self) {
268 let runner = self.burst_runner.read();
269 runner.clear_all_visualization_subscriptions();
270 }
271
272 async fn get_fcl_snapshot(&self) -> ServiceResult<Vec<(u64, f32)>> {
273 let runner = self.burst_runner.read();
274 let fcl_data = runner.get_fcl_snapshot();
275
276 let result = fcl_data
278 .iter()
279 .map(|(neuron_id, potential)| (neuron_id.0 as u64, *potential))
280 .collect();
281
282 Ok(result)
283 }
284
285 async fn get_fcl_snapshot_with_cortical_idx(&self) -> ServiceResult<Vec<(u64, u32, f32)>> {
286 let runner = self.burst_runner.read();
287 let npu = runner.get_npu();
288
289 let lock_start = std::time::Instant::now();
292 let thread_id = std::thread::current().id();
293 debug!("[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} attempting NPU lock for get_fcl_snapshot_with_cortical_idx at {:?}", thread_id, lock_start);
294 let result: Vec<(u64, u32, f32)> = {
295 let npu_lock = npu.lock().unwrap();
297 let lock_acquired = std::time::Instant::now();
298 let lock_wait = lock_acquired.duration_since(lock_start);
299 debug!(
300 "[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} acquired lock after {:.2}ms wait for get_fcl_snapshot_with_cortical_idx",
301 thread_id,
302 lock_wait.as_secs_f64() * 1000.0
303 );
304
305 let fcl_data = npu_lock
307 .get_last_fcl_snapshot_with_cortical_idx()
308 .map_err(|e| {
309 ServiceError::Internal(format!("Failed to resolve FCL cortical_idx: {e}"))
310 })?;
311 debug!(
312 "[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} got FCL snapshot ({} neurons) with cortical_idx",
313 thread_id,
314 fcl_data.len()
315 );
316
317 fcl_data
318 .into_iter()
319 .map(|(neuron_id, cortical_idx, potential)| {
320 (neuron_id.0 as u64, cortical_idx, potential)
321 })
322 .collect()
323 }; let lock_released = std::time::Instant::now();
325 let _lock_hold_duration = lock_released.duration_since(lock_start);
326 debug!("[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} RELEASED NPU lock after get_fcl_snapshot_with_cortical_idx (total: {:.2}ms wait + {:.2}ms hold, {} neurons)",
327 thread_id,
328 lock_released.duration_since(lock_start).as_secs_f64() * 1000.0,
329 lock_released.duration_since(lock_start).as_secs_f64() * 1000.0,
330 result.len());
331
332 Ok(result)
333 }
334
335 async fn get_fire_queue_sample(
336 &self,
337 ) -> ServiceResult<
338 std::collections::HashMap<u32, (Vec<u32>, Vec<u32>, Vec<u32>, Vec<u32>, Vec<f32>)>,
339 > {
340 let mut runner = self.burst_runner.write();
341
342 match runner.get_fire_queue_sample() {
343 Some(sample) => {
344 let result: std::collections::HashMap<_, _> = sample.into_iter().collect();
346 Ok(result)
347 }
348 None => Ok(std::collections::HashMap::new()),
349 }
350 }
351
352 async fn get_fire_ledger_configs(&self) -> ServiceResult<Vec<(u32, usize)>> {
353 debug!("[NPU-LOCK] RUNTIME-SERVICE: get_fire_ledger_configs() called - this acquires NPU lock!");
354 let runner = self.burst_runner.read();
355 let configs = runner.get_fire_ledger_configs();
356 Ok(configs)
357 }
358
359 async fn configure_fire_ledger_window(
360 &self,
361 cortical_idx: u32,
362 window_size: usize,
363 ) -> ServiceResult<()> {
364 let mut runner = self.burst_runner.write();
365 runner
366 .configure_fire_ledger_window(cortical_idx, window_size)
367 .map_err(|e| {
368 ServiceError::Internal(format!("Failed to configure fire ledger window: {e}"))
369 })?;
370
371 info!(target: "feagi-services", "Configured Fire Ledger window for area {}: {} bursts",
372 cortical_idx, window_size);
373
374 Ok(())
375 }
376
377 async fn get_fcl_sampler_config(&self) -> ServiceResult<(f64, u32)> {
378 let runner = self.burst_runner.read();
379 Ok(runner.get_fcl_sampler_config())
380 }
381
382 async fn set_fcl_sampler_config(
383 &self,
384 frequency: Option<f64>,
385 consumer: Option<u32>,
386 ) -> ServiceResult<()> {
387 let runner = self.burst_runner.read();
388 runner.set_fcl_sampler_config(frequency, consumer);
389 Ok(())
390 }
391
392 async fn get_area_fcl_sample_rate(&self, area_id: u32) -> ServiceResult<f64> {
393 let runner = self.burst_runner.read();
394 Ok(runner.get_area_fcl_sample_rate(area_id))
395 }
396
397 async fn set_area_fcl_sample_rate(&self, area_id: u32, sample_rate: f64) -> ServiceResult<()> {
398 if sample_rate <= 0.0 || sample_rate > 1000.0 {
399 return Err(ServiceError::InvalidInput(
400 "Sample rate must be between 0 and 1000 Hz".to_string(),
401 ));
402 }
403
404 let runner = self.burst_runner.read();
405 runner.set_area_fcl_sample_rate(area_id, sample_rate);
406
407 info!(target: "feagi-services", "Set FCL sample rate for area {} to {}Hz", area_id, sample_rate);
408 Ok(())
409 }
410
411 async fn inject_sensory_by_coordinates(
412 &self,
413 cortical_id: &str,
414 xyzp_data: &[(u32, u32, u32, f32)],
415 mode: ManualStimulationMode,
416 ) -> ServiceResult<usize> {
417 let cortical_id_typed = CorticalID::try_from_base_64(cortical_id).map_err(|e| {
419 ServiceError::InvalidInput(format!("Invalid cortical ID format: {}", e))
420 })?;
421
422 let runner = self.burst_runner.read();
424 let npu = runner.get_npu();
425
426 let lock_start = std::time::Instant::now();
428 debug!(
429 "[NPU-LOCK] RUNTIME-SERVICE: Acquiring lock for manual stimulation ({} coordinates) - THIS CAN BLOCK BURST LOOP!",
430 xyzp_data.len()
431 );
432 let injected_count = {
433 let mut npu_lock = npu
434 .lock()
435 .map_err(|e| ServiceError::Backend(format!("Failed to lock NPU: {}", e)))?;
436 let lock_wait = lock_start.elapsed();
437 debug!(
438 "[NPU-LOCK] RUNTIME-SERVICE: Lock acquired for manual stimulation (waited {:.2}ms)",
439 lock_wait.as_secs_f64() * 1000.0
440 );
441 let result = match mode {
442 ManualStimulationMode::Candidate => {
443 npu_lock.inject_sensory_xyzp_by_id(&cortical_id_typed, xyzp_data)
444 }
445 ManualStimulationMode::ForceFire => {
446 npu_lock.inject_force_fire_by_coordinates(&cortical_id_typed, xyzp_data)
447 }
448 };
449 let lock_hold_duration = lock_start.elapsed();
450 debug!(
451 "[NPU-LOCK] RUNTIME-SERVICE: Releasing lock after manual stimulation (held for {:.2}ms)",
452 lock_hold_duration.as_secs_f64() * 1000.0
453 );
454 result
455 };
456
457 if injected_count == 0 && !xyzp_data.is_empty() {
458 warn!(target: "feagi-services",
459 "No neurons found for injection: cortical_id={}, coordinates={}",
460 cortical_id, xyzp_data.len());
461 }
462
463 Ok(injected_count)
464 }
465
466 async fn reset_cortical_area_states(
467 &self,
468 cortical_indices: &[u32],
469 ) -> ServiceResult<Vec<(u32, usize)>> {
470 if cortical_indices.is_empty() {
471 return Ok(Vec::new());
472 }
473
474 let runner = self.burst_runner.read();
475 let npu = runner.get_npu();
476 let mut npu_lock = npu
477 .lock()
478 .map_err(|e| ServiceError::Backend(format!("Failed to lock NPU: {e}")))?;
479
480 let mut results: Vec<(u32, usize)> = Vec::with_capacity(cortical_indices.len());
481 for &idx in cortical_indices {
482 let count = npu_lock.reset_cortical_area_runtime_state(idx);
483 results.push((idx, count));
484 }
485
486 drop(npu_lock);
488 drop(runner);
489
490 #[cfg(feature = "plasticity")]
492 if let Some(plasticity_any) = &self.plasticity_service {
493 if let Some(plasticity_service) =
494 plasticity_any.downcast_ref::<feagi_npu_plasticity::PlasticityService>()
495 {
496 for &idx in cortical_indices {
497 let memory_neuron_count = plasticity_service.reset_memory_neurons_in_area(idx);
498 if memory_neuron_count > 0 {
499 info!(
500 target: "feagi-services",
501 "Reset {} memory neurons in cortical area {}",
502 memory_neuron_count,
503 idx
504 );
505 }
506 }
507 }
508 }
509
510 Ok(results)
511 }
512}