1use std::sync::Arc;
14
15use async_trait::async_trait;
16use feagi_npu_burst_engine::BurstLoopRunner;
17use feagi_structures::genomic::cortical_area::CorticalID;
18use parking_lot::RwLock;
19use tracing::{debug, info, warn};
20
21use crate::traits::RuntimeService;
22use crate::types::{RuntimeStatus, ServiceError, ServiceResult};
23
24pub struct RuntimeServiceImpl {
28 burst_runner: Arc<RwLock<BurstLoopRunner>>,
29 paused: Arc<RwLock<bool>>,
30}
31
32impl RuntimeServiceImpl {
33 pub fn new(burst_runner: Arc<RwLock<BurstLoopRunner>>) -> Self {
35 Self {
36 burst_runner,
37 paused: Arc::new(RwLock::new(false)),
38 }
39 }
40}
41
42#[async_trait]
43impl RuntimeService for RuntimeServiceImpl {
44 async fn start(&self) -> ServiceResult<()> {
45 info!(target: "feagi-services", "Starting burst engine");
46
47 let mut runner = self.burst_runner.write();
48
49 runner
50 .start()
51 .map_err(|e| ServiceError::InvalidState(e.to_string()))?;
52
53 *self.paused.write() = false;
55
56 Ok(())
57 }
58
59 async fn stop(&self) -> ServiceResult<()> {
60 info!(target: "feagi-services", "Stopping burst engine");
61
62 let mut runner = self.burst_runner.write();
63 runner.stop();
64
65 *self.paused.write() = false;
67
68 Ok(())
69 }
70
71 async fn pause(&self) -> ServiceResult<()> {
72 info!(target: "feagi-services", "Pausing burst engine");
73
74 let runner = self.burst_runner.read();
75 if !runner.is_running() {
76 return Err(ServiceError::InvalidState(
77 "Burst engine is not running".to_string(),
78 ));
79 }
80
81 *self.paused.write() = true;
83
84 warn!(target: "feagi-services", "Pause not yet implemented in BurstLoopRunner - using flag only");
87
88 Ok(())
89 }
90
91 async fn resume(&self) -> ServiceResult<()> {
92 info!(target: "feagi-services", "Resuming burst engine");
93
94 let paused = *self.paused.read();
95 if !paused {
96 return Err(ServiceError::InvalidState(
97 "Burst engine is not paused".to_string(),
98 ));
99 }
100
101 *self.paused.write() = false;
103
104 warn!(target: "feagi-services", "Resume not yet implemented in BurstLoopRunner - using flag only");
106
107 Ok(())
108 }
109
110 async fn step(&self) -> ServiceResult<()> {
111 info!(target: "feagi-services", "Executing single burst step");
112
113 let runner = self.burst_runner.read();
114 if runner.is_running() {
115 return Err(ServiceError::InvalidState(
116 "Cannot step while burst engine is running in continuous mode".to_string(),
117 ));
118 }
119
120 warn!(target: "feagi-services", "Single-step execution not yet implemented in BurstLoopRunner");
122
123 Err(ServiceError::NotImplemented(
124 "Single-step execution not yet implemented".to_string(),
125 ))
126 }
127
128 async fn get_status(&self) -> ServiceResult<RuntimeStatus> {
129 let runner = self.burst_runner.read();
130 let is_running = runner.is_running();
131 let burst_count = runner.get_burst_count();
132 let is_paused = *self.paused.read();
133
134 Ok(RuntimeStatus {
139 is_running,
140 is_paused,
141 frequency_hz: runner.get_frequency(),
142 burst_count,
143 current_rate_hz: if is_running {
144 runner.get_frequency()
145 } else {
146 0.0
147 },
148 last_burst_neuron_count: 0, avg_burst_time_ms: 0.0, })
151 }
152
153 async fn set_frequency(&self, frequency_hz: f64) -> ServiceResult<()> {
154 if frequency_hz <= 0.0 {
155 return Err(ServiceError::InvalidInput(
156 "Frequency must be greater than 0".to_string(),
157 ));
158 }
159
160 info!(target: "feagi-services", "Setting burst frequency to {} Hz", frequency_hz);
161
162 let mut runner = self.burst_runner.write();
163 runner.set_frequency(frequency_hz);
164
165 Ok(())
166 }
167
168 async fn get_burst_count(&self) -> ServiceResult<u64> {
169 let runner = self.burst_runner.read();
170 Ok(runner.get_burst_count())
171 }
172
173 async fn reset_burst_count(&self) -> ServiceResult<()> {
174 info!(target: "feagi-services", "Resetting burst count");
175
176 warn!(target: "feagi-services", "Burst count reset not yet implemented in BurstLoopRunner");
178
179 Err(ServiceError::NotImplemented(
180 "Burst count reset not yet implemented".to_string(),
181 ))
182 }
183
184 async fn get_fcl_snapshot(&self) -> ServiceResult<Vec<(u64, f32)>> {
185 let runner = self.burst_runner.read();
186 let fcl_data = runner.get_fcl_snapshot();
187
188 let result = fcl_data
190 .iter()
191 .map(|(neuron_id, potential)| (neuron_id.0 as u64, *potential))
192 .collect();
193
194 Ok(result)
195 }
196
197 async fn get_fcl_snapshot_with_cortical_idx(&self) -> ServiceResult<Vec<(u64, u32, f32)>> {
198 let runner = self.burst_runner.read();
199 let npu = runner.get_npu();
200
201 let lock_start = std::time::Instant::now();
204 let thread_id = std::thread::current().id();
205 debug!("[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} attempting NPU lock for get_fcl_snapshot_with_cortical_idx at {:?}", thread_id, lock_start);
206 let result: Vec<(u64, u32, f32)> = {
207 let npu_lock = npu.lock().unwrap();
209 let lock_acquired = std::time::Instant::now();
210 let lock_wait = lock_acquired.duration_since(lock_start);
211 debug!(
212 "[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} acquired lock after {:.2}ms wait for get_fcl_snapshot_with_cortical_idx",
213 thread_id,
214 lock_wait.as_secs_f64() * 1000.0
215 );
216
217 let fcl_data = npu_lock
219 .get_last_fcl_snapshot_with_cortical_idx()
220 .map_err(|e| {
221 ServiceError::Internal(format!("Failed to resolve FCL cortical_idx: {e}"))
222 })?;
223 debug!(
224 "[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} got FCL snapshot ({} neurons) with cortical_idx",
225 thread_id,
226 fcl_data.len()
227 );
228
229 fcl_data
230 .into_iter()
231 .map(|(neuron_id, cortical_idx, potential)| {
232 (neuron_id.0 as u64, cortical_idx, potential)
233 })
234 .collect()
235 }; let lock_released = std::time::Instant::now();
237 let _lock_hold_duration = lock_released.duration_since(lock_start);
238 debug!("[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} RELEASED NPU lock after get_fcl_snapshot_with_cortical_idx (total: {:.2}ms wait + {:.2}ms hold, {} neurons)",
239 thread_id,
240 lock_released.duration_since(lock_start).as_secs_f64() * 1000.0,
241 lock_released.duration_since(lock_start).as_secs_f64() * 1000.0,
242 result.len());
243
244 Ok(result)
245 }
246
247 async fn get_fire_queue_sample(
248 &self,
249 ) -> ServiceResult<
250 std::collections::HashMap<u32, (Vec<u32>, Vec<u32>, Vec<u32>, Vec<u32>, Vec<f32>)>,
251 > {
252 let mut runner = self.burst_runner.write();
253
254 match runner.get_fire_queue_sample() {
255 Some(sample) => {
256 let result: std::collections::HashMap<_, _> = sample.into_iter().collect();
258 Ok(result)
259 }
260 None => Ok(std::collections::HashMap::new()),
261 }
262 }
263
264 async fn get_fire_ledger_configs(&self) -> ServiceResult<Vec<(u32, usize)>> {
265 debug!("[NPU-LOCK] RUNTIME-SERVICE: get_fire_ledger_configs() called - this acquires NPU lock!");
266 let runner = self.burst_runner.read();
267 let configs = runner.get_fire_ledger_configs();
268 Ok(configs)
269 }
270
271 async fn configure_fire_ledger_window(
272 &self,
273 cortical_idx: u32,
274 window_size: usize,
275 ) -> ServiceResult<()> {
276 let mut runner = self.burst_runner.write();
277 runner
278 .configure_fire_ledger_window(cortical_idx, window_size)
279 .map_err(|e| {
280 ServiceError::Internal(format!("Failed to configure fire ledger window: {e}"))
281 })?;
282
283 info!(target: "feagi-services", "Configured Fire Ledger window for area {}: {} bursts",
284 cortical_idx, window_size);
285
286 Ok(())
287 }
288
289 async fn get_fcl_sampler_config(&self) -> ServiceResult<(f64, u32)> {
290 let runner = self.burst_runner.read();
291 Ok(runner.get_fcl_sampler_config())
292 }
293
294 async fn set_fcl_sampler_config(
295 &self,
296 frequency: Option<f64>,
297 consumer: Option<u32>,
298 ) -> ServiceResult<()> {
299 let runner = self.burst_runner.read();
300 runner.set_fcl_sampler_config(frequency, consumer);
301 Ok(())
302 }
303
304 async fn get_area_fcl_sample_rate(&self, area_id: u32) -> ServiceResult<f64> {
305 let runner = self.burst_runner.read();
306 Ok(runner.get_area_fcl_sample_rate(area_id))
307 }
308
309 async fn set_area_fcl_sample_rate(&self, area_id: u32, sample_rate: f64) -> ServiceResult<()> {
310 if sample_rate <= 0.0 || sample_rate > 1000.0 {
311 return Err(ServiceError::InvalidInput(
312 "Sample rate must be between 0 and 1000 Hz".to_string(),
313 ));
314 }
315
316 let runner = self.burst_runner.read();
317 runner.set_area_fcl_sample_rate(area_id, sample_rate);
318
319 info!(target: "feagi-services", "Set FCL sample rate for area {} to {}Hz", area_id, sample_rate);
320 Ok(())
321 }
322
323 async fn inject_sensory_by_coordinates(
324 &self,
325 cortical_id: &str,
326 xyzp_data: &[(u32, u32, u32, f32)],
327 ) -> ServiceResult<usize> {
328 let cortical_id_typed = CorticalID::try_from_base_64(cortical_id).map_err(|e| {
330 ServiceError::InvalidInput(format!("Invalid cortical ID format: {}", e))
331 })?;
332
333 let runner = self.burst_runner.read();
335 let npu = runner.get_npu();
336
337 let lock_start = std::time::Instant::now();
339 debug!(
340 "[NPU-LOCK] RUNTIME-SERVICE: Acquiring lock for manual stimulation ({} coordinates) - THIS CAN BLOCK BURST LOOP!",
341 xyzp_data.len()
342 );
343 let injected_count = {
344 let mut npu_lock = npu
345 .lock()
346 .map_err(|e| ServiceError::Backend(format!("Failed to lock NPU: {}", e)))?;
347 let lock_wait = lock_start.elapsed();
348 debug!(
349 "[NPU-LOCK] RUNTIME-SERVICE: Lock acquired for manual stimulation (waited {:.2}ms)",
350 lock_wait.as_secs_f64() * 1000.0
351 );
352 let result = npu_lock.inject_sensory_xyzp_by_id(&cortical_id_typed, xyzp_data);
353 let lock_hold_duration = lock_start.elapsed();
354 debug!(
355 "[NPU-LOCK] RUNTIME-SERVICE: Releasing lock after manual stimulation (held for {:.2}ms)",
356 lock_hold_duration.as_secs_f64() * 1000.0
357 );
358 result
359 };
360
361 if injected_count == 0 && !xyzp_data.is_empty() {
362 warn!(target: "feagi-services",
363 "No neurons found for injection: cortical_id={}, coordinates={}",
364 cortical_id, xyzp_data.len());
365 } else if injected_count > 0 {
366 info!(target: "feagi-services",
367 "Injected {} neurons into FCL for cortical area {}",
368 injected_count, cortical_id);
369 }
370
371 Ok(injected_count)
372 }
373}