1use std::sync::Arc;
14
15use ahash::AHashSet;
16use async_trait::async_trait;
17use feagi_npu_burst_engine::BurstLoopRunner;
18use feagi_structures::genomic::cortical_area::CorticalID;
19use parking_lot::RwLock;
20use tracing::{debug, info, warn};
21
22use crate::traits::runtime_service::ManualStimulationMode;
23use crate::traits::RuntimeService;
24use crate::types::{RuntimeStatus, ServiceError, ServiceResult};
25
26pub struct RuntimeServiceImpl {
30 burst_runner: Arc<RwLock<BurstLoopRunner>>,
31 paused: Arc<RwLock<bool>>,
32}
33
34impl RuntimeServiceImpl {
35 pub fn new(burst_runner: Arc<RwLock<BurstLoopRunner>>) -> Self {
37 Self {
38 burst_runner,
39 paused: Arc::new(RwLock::new(false)),
40 }
41 }
42}
43
44#[async_trait]
45impl RuntimeService for RuntimeServiceImpl {
46 async fn start(&self) -> ServiceResult<()> {
47 info!(target: "feagi-services", "Starting burst engine");
48
49 let mut runner = self.burst_runner.write();
50
51 runner
52 .start()
53 .map_err(|e| ServiceError::InvalidState(e.to_string()))?;
54
55 *self.paused.write() = false;
57
58 Ok(())
59 }
60
61 async fn stop(&self) -> ServiceResult<()> {
62 info!(target: "feagi-services", "Stopping burst engine");
63
64 let mut runner = self.burst_runner.write();
65 runner.stop();
66
67 *self.paused.write() = false;
69
70 Ok(())
71 }
72
73 async fn pause(&self) -> ServiceResult<()> {
74 info!(target: "feagi-services", "Pausing burst engine");
75
76 let runner = self.burst_runner.read();
77 if !runner.is_running() {
78 return Err(ServiceError::InvalidState(
79 "Burst engine is not running".to_string(),
80 ));
81 }
82
83 *self.paused.write() = true;
85
86 warn!(target: "feagi-services", "Pause not yet implemented in BurstLoopRunner - using flag only");
89
90 Ok(())
91 }
92
93 async fn resume(&self) -> ServiceResult<()> {
94 info!(target: "feagi-services", "Resuming burst engine");
95
96 let paused = *self.paused.read();
97 if !paused {
98 return Err(ServiceError::InvalidState(
99 "Burst engine is not paused".to_string(),
100 ));
101 }
102
103 *self.paused.write() = false;
105
106 warn!(target: "feagi-services", "Resume not yet implemented in BurstLoopRunner - using flag only");
108
109 Ok(())
110 }
111
112 async fn step(&self) -> ServiceResult<()> {
113 info!(target: "feagi-services", "Executing single burst step");
114
115 let runner = self.burst_runner.read();
116 if runner.is_running() {
117 return Err(ServiceError::InvalidState(
118 "Cannot step while burst engine is running in continuous mode".to_string(),
119 ));
120 }
121
122 warn!(target: "feagi-services", "Single-step execution not yet implemented in BurstLoopRunner");
124
125 Err(ServiceError::NotImplemented(
126 "Single-step execution not yet implemented".to_string(),
127 ))
128 }
129
130 async fn get_status(&self) -> ServiceResult<RuntimeStatus> {
131 let runner = self.burst_runner.read();
132 let is_running = runner.is_running();
133 let burst_count = runner.get_burst_count();
134 let is_paused = *self.paused.read();
135
136 Ok(RuntimeStatus {
141 is_running,
142 is_paused,
143 frequency_hz: runner.get_frequency(),
144 burst_count,
145 current_rate_hz: if is_running {
146 runner.get_frequency()
147 } else {
148 0.0
149 },
150 last_burst_neuron_count: 0, avg_burst_time_ms: 0.0, })
153 }
154
155 async fn set_frequency(&self, frequency_hz: f64) -> ServiceResult<()> {
156 if frequency_hz <= 0.0 {
157 return Err(ServiceError::InvalidInput(
158 "Frequency must be greater than 0".to_string(),
159 ));
160 }
161
162 info!(target: "feagi-services", "Setting burst frequency to {} Hz", frequency_hz);
163
164 let mut runner = self.burst_runner.write();
165 runner.set_frequency(frequency_hz);
166
167 Ok(())
168 }
169
170 async fn get_burst_count(&self) -> ServiceResult<u64> {
171 let runner = self.burst_runner.read();
172 Ok(runner.get_burst_count())
173 }
174
175 async fn reset_burst_count(&self) -> ServiceResult<()> {
176 info!(target: "feagi-services", "Resetting burst count");
177
178 warn!(target: "feagi-services", "Burst count reset not yet implemented in BurstLoopRunner");
180
181 Err(ServiceError::NotImplemented(
182 "Burst count reset not yet implemented".to_string(),
183 ))
184 }
185
186 async fn register_motor_subscriptions(
187 &self,
188 agent_id: &str,
189 cortical_ids: Vec<String>,
190 rate_hz: f64,
191 ) -> ServiceResult<()> {
192 if rate_hz <= 0.0 {
193 return Err(ServiceError::InvalidInput(
194 "Motor rate must be greater than 0".to_string(),
195 ));
196 }
197
198 let cortical_set: AHashSet<String> = cortical_ids.into_iter().collect();
199 let runner = self.burst_runner.read();
200 runner
201 .register_motor_subscriptions_with_rate(agent_id.to_string(), cortical_set, rate_hz)
202 .map_err(|e| ServiceError::InvalidInput(e.to_string()))
203 }
204
205 async fn register_visualization_subscriptions(
206 &self,
207 agent_id: &str,
208 rate_hz: f64,
209 ) -> ServiceResult<()> {
210 if rate_hz <= 0.0 {
211 return Err(ServiceError::InvalidInput(
212 "Visualization rate must be greater than 0".to_string(),
213 ));
214 }
215
216 let runner = self.burst_runner.read();
217 runner
218 .register_visualization_subscriptions_with_rate(agent_id.to_string(), rate_hz)
219 .map_err(|e| ServiceError::InvalidInput(e.to_string()))
220 }
221
222 fn unregister_motor_subscriptions(&self, agent_id: &str) {
223 let runner = self.burst_runner.read();
224 runner.unregister_motor_subscriptions(agent_id);
225 }
226
227 fn unregister_visualization_subscriptions(&self, agent_id: &str) {
228 let runner = self.burst_runner.read();
229 runner.unregister_visualization_subscriptions(agent_id);
230 }
231
232 async fn get_fcl_snapshot(&self) -> ServiceResult<Vec<(u64, f32)>> {
233 let runner = self.burst_runner.read();
234 let fcl_data = runner.get_fcl_snapshot();
235
236 let result = fcl_data
238 .iter()
239 .map(|(neuron_id, potential)| (neuron_id.0 as u64, *potential))
240 .collect();
241
242 Ok(result)
243 }
244
245 async fn get_fcl_snapshot_with_cortical_idx(&self) -> ServiceResult<Vec<(u64, u32, f32)>> {
246 let runner = self.burst_runner.read();
247 let npu = runner.get_npu();
248
249 let lock_start = std::time::Instant::now();
252 let thread_id = std::thread::current().id();
253 debug!("[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} attempting NPU lock for get_fcl_snapshot_with_cortical_idx at {:?}", thread_id, lock_start);
254 let result: Vec<(u64, u32, f32)> = {
255 let npu_lock = npu.lock().unwrap();
257 let lock_acquired = std::time::Instant::now();
258 let lock_wait = lock_acquired.duration_since(lock_start);
259 debug!(
260 "[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} acquired lock after {:.2}ms wait for get_fcl_snapshot_with_cortical_idx",
261 thread_id,
262 lock_wait.as_secs_f64() * 1000.0
263 );
264
265 let fcl_data = npu_lock
267 .get_last_fcl_snapshot_with_cortical_idx()
268 .map_err(|e| {
269 ServiceError::Internal(format!("Failed to resolve FCL cortical_idx: {e}"))
270 })?;
271 debug!(
272 "[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} got FCL snapshot ({} neurons) with cortical_idx",
273 thread_id,
274 fcl_data.len()
275 );
276
277 fcl_data
278 .into_iter()
279 .map(|(neuron_id, cortical_idx, potential)| {
280 (neuron_id.0 as u64, cortical_idx, potential)
281 })
282 .collect()
283 }; let lock_released = std::time::Instant::now();
285 let _lock_hold_duration = lock_released.duration_since(lock_start);
286 debug!("[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} RELEASED NPU lock after get_fcl_snapshot_with_cortical_idx (total: {:.2}ms wait + {:.2}ms hold, {} neurons)",
287 thread_id,
288 lock_released.duration_since(lock_start).as_secs_f64() * 1000.0,
289 lock_released.duration_since(lock_start).as_secs_f64() * 1000.0,
290 result.len());
291
292 Ok(result)
293 }
294
295 async fn get_fire_queue_sample(
296 &self,
297 ) -> ServiceResult<
298 std::collections::HashMap<u32, (Vec<u32>, Vec<u32>, Vec<u32>, Vec<u32>, Vec<f32>)>,
299 > {
300 let mut runner = self.burst_runner.write();
301
302 match runner.get_fire_queue_sample() {
303 Some(sample) => {
304 let result: std::collections::HashMap<_, _> = sample.into_iter().collect();
306 Ok(result)
307 }
308 None => Ok(std::collections::HashMap::new()),
309 }
310 }
311
312 async fn get_fire_ledger_configs(&self) -> ServiceResult<Vec<(u32, usize)>> {
313 debug!("[NPU-LOCK] RUNTIME-SERVICE: get_fire_ledger_configs() called - this acquires NPU lock!");
314 let runner = self.burst_runner.read();
315 let configs = runner.get_fire_ledger_configs();
316 Ok(configs)
317 }
318
319 async fn configure_fire_ledger_window(
320 &self,
321 cortical_idx: u32,
322 window_size: usize,
323 ) -> ServiceResult<()> {
324 let mut runner = self.burst_runner.write();
325 runner
326 .configure_fire_ledger_window(cortical_idx, window_size)
327 .map_err(|e| {
328 ServiceError::Internal(format!("Failed to configure fire ledger window: {e}"))
329 })?;
330
331 info!(target: "feagi-services", "Configured Fire Ledger window for area {}: {} bursts",
332 cortical_idx, window_size);
333
334 Ok(())
335 }
336
337 async fn get_fcl_sampler_config(&self) -> ServiceResult<(f64, u32)> {
338 let runner = self.burst_runner.read();
339 Ok(runner.get_fcl_sampler_config())
340 }
341
342 async fn set_fcl_sampler_config(
343 &self,
344 frequency: Option<f64>,
345 consumer: Option<u32>,
346 ) -> ServiceResult<()> {
347 let runner = self.burst_runner.read();
348 runner.set_fcl_sampler_config(frequency, consumer);
349 Ok(())
350 }
351
352 async fn get_area_fcl_sample_rate(&self, area_id: u32) -> ServiceResult<f64> {
353 let runner = self.burst_runner.read();
354 Ok(runner.get_area_fcl_sample_rate(area_id))
355 }
356
357 async fn set_area_fcl_sample_rate(&self, area_id: u32, sample_rate: f64) -> ServiceResult<()> {
358 if sample_rate <= 0.0 || sample_rate > 1000.0 {
359 return Err(ServiceError::InvalidInput(
360 "Sample rate must be between 0 and 1000 Hz".to_string(),
361 ));
362 }
363
364 let runner = self.burst_runner.read();
365 runner.set_area_fcl_sample_rate(area_id, sample_rate);
366
367 info!(target: "feagi-services", "Set FCL sample rate for area {} to {}Hz", area_id, sample_rate);
368 Ok(())
369 }
370
371 async fn inject_sensory_by_coordinates(
372 &self,
373 cortical_id: &str,
374 xyzp_data: &[(u32, u32, u32, f32)],
375 mode: ManualStimulationMode,
376 ) -> ServiceResult<usize> {
377 let cortical_id_typed = CorticalID::try_from_base_64(cortical_id).map_err(|e| {
379 ServiceError::InvalidInput(format!("Invalid cortical ID format: {}", e))
380 })?;
381
382 let runner = self.burst_runner.read();
384 let npu = runner.get_npu();
385
386 let lock_start = std::time::Instant::now();
388 debug!(
389 "[NPU-LOCK] RUNTIME-SERVICE: Acquiring lock for manual stimulation ({} coordinates) - THIS CAN BLOCK BURST LOOP!",
390 xyzp_data.len()
391 );
392 let injected_count = {
393 let mut npu_lock = npu
394 .lock()
395 .map_err(|e| ServiceError::Backend(format!("Failed to lock NPU: {}", e)))?;
396 let lock_wait = lock_start.elapsed();
397 debug!(
398 "[NPU-LOCK] RUNTIME-SERVICE: Lock acquired for manual stimulation (waited {:.2}ms)",
399 lock_wait.as_secs_f64() * 1000.0
400 );
401 let result = match mode {
402 ManualStimulationMode::Candidate => {
403 npu_lock.inject_sensory_xyzp_by_id(&cortical_id_typed, xyzp_data)
404 }
405 ManualStimulationMode::ForceFire => {
406 npu_lock.inject_force_fire_by_coordinates(&cortical_id_typed, xyzp_data)
407 }
408 };
409 let lock_hold_duration = lock_start.elapsed();
410 debug!(
411 "[NPU-LOCK] RUNTIME-SERVICE: Releasing lock after manual stimulation (held for {:.2}ms)",
412 lock_hold_duration.as_secs_f64() * 1000.0
413 );
414 result
415 };
416
417 if injected_count == 0 && !xyzp_data.is_empty() {
418 warn!(target: "feagi-services",
419 "No neurons found for injection: cortical_id={}, coordinates={}",
420 cortical_id, xyzp_data.len());
421 } else if injected_count > 0 {
422 info!(target: "feagi-services",
423 "Injected {} neurons into FCL for cortical area {}",
424 injected_count, cortical_id);
425 }
426
427 Ok(injected_count)
428 }
429}