Skip to main content

feagi_services/impls/
runtime_service_impl.rs

1// Copyright 2025 Neuraville Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4/*!
5Runtime control service implementation.
6
7Provides control over the FEAGI burst engine runtime.
8
9Copyright 2025 Neuraville Inc.
10Licensed under the Apache License, Version 2.0
11*/
12
13use std::sync::Arc;
14
15use ahash::AHashSet;
16use async_trait::async_trait;
17use feagi_npu_burst_engine::BurstLoopRunner;
18use feagi_structures::genomic::cortical_area::CorticalID;
19use parking_lot::RwLock;
20use tracing::{debug, info, warn};
21
22use crate::traits::runtime_service::ManualStimulationMode;
23use crate::traits::RuntimeService;
24use crate::types::{RuntimeStatus, ServiceError, ServiceResult};
25
26/// Default implementation of RuntimeService
27///
28/// Wraps the BurstLoopRunner and provides async interface for runtime control.
29pub struct RuntimeServiceImpl {
30    burst_runner: Arc<RwLock<BurstLoopRunner>>,
31    paused: Arc<RwLock<bool>>,
32}
33
34impl RuntimeServiceImpl {
35    /// Create a new RuntimeServiceImpl
36    pub fn new(burst_runner: Arc<RwLock<BurstLoopRunner>>) -> Self {
37        Self {
38            burst_runner,
39            paused: Arc::new(RwLock::new(false)),
40        }
41    }
42}
43
44#[async_trait]
45impl RuntimeService for RuntimeServiceImpl {
46    async fn start(&self) -> ServiceResult<()> {
47        info!(target: "feagi-services", "Starting burst engine");
48
49        let mut runner = self.burst_runner.write();
50
51        runner
52            .start()
53            .map_err(|e| ServiceError::InvalidState(e.to_string()))?;
54
55        // Clear paused flag
56        *self.paused.write() = false;
57
58        Ok(())
59    }
60
61    async fn stop(&self) -> ServiceResult<()> {
62        info!(target: "feagi-services", "Stopping burst engine");
63
64        let mut runner = self.burst_runner.write();
65        runner
66            .stop_strict()
67            .map_err(|e| ServiceError::Backend(format!("Failed to stop burst engine: {}", e)))?;
68
69        // Clear paused flag
70        *self.paused.write() = false;
71
72        Ok(())
73    }
74
75    async fn pause(&self) -> ServiceResult<()> {
76        info!(target: "feagi-services", "Pausing burst engine");
77
78        let runner = self.burst_runner.read();
79        if !runner.is_running() {
80            return Err(ServiceError::InvalidState(
81                "Burst engine is not running".to_string(),
82            ));
83        }
84
85        // Set paused flag (actual pause implementation depends on burst loop design)
86        *self.paused.write() = true;
87
88        // TODO: Implement actual pause mechanism in BurstLoopRunner
89        // For now, we just track the paused state
90        warn!(target: "feagi-services", "Pause not yet implemented in BurstLoopRunner - using flag only");
91
92        Ok(())
93    }
94
95    async fn resume(&self) -> ServiceResult<()> {
96        info!(target: "feagi-services", "Resuming burst engine");
97
98        let paused = *self.paused.read();
99        if !paused {
100            return Err(ServiceError::InvalidState(
101                "Burst engine is not paused".to_string(),
102            ));
103        }
104
105        // Clear paused flag
106        *self.paused.write() = false;
107
108        // TODO: Implement actual resume mechanism in BurstLoopRunner
109        warn!(target: "feagi-services", "Resume not yet implemented in BurstLoopRunner - using flag only");
110
111        Ok(())
112    }
113
114    async fn step(&self) -> ServiceResult<()> {
115        info!(target: "feagi-services", "Executing single burst step");
116
117        let runner = self.burst_runner.read();
118        if runner.is_running() {
119            return Err(ServiceError::InvalidState(
120                "Cannot step while burst engine is running in continuous mode".to_string(),
121            ));
122        }
123
124        // TODO: Implement single-step execution in BurstLoopRunner
125        warn!(target: "feagi-services", "Single-step execution not yet implemented in BurstLoopRunner");
126
127        Err(ServiceError::NotImplemented(
128            "Single-step execution not yet implemented".to_string(),
129        ))
130    }
131
132    async fn get_status(&self) -> ServiceResult<RuntimeStatus> {
133        let runner = self.burst_runner.read();
134        let is_running = runner.is_running();
135        let burst_count = runner.get_burst_count();
136        let is_paused = *self.paused.read();
137
138        // Note: Some metrics not yet available from BurstLoopRunner
139        // - current_rate_hz: Would require tracking actual execution rate
140        // - last_burst_neuron_count: Not tracked by BurstLoopRunner
141        // - avg_burst_time_ms: Not tracked by BurstLoopRunner
142        Ok(RuntimeStatus {
143            is_running,
144            is_paused,
145            frequency_hz: runner.get_frequency(),
146            burst_count,
147            current_rate_hz: if is_running {
148                runner.get_frequency()
149            } else {
150                0.0
151            },
152            last_burst_neuron_count: 0, // Not yet tracked
153            avg_burst_time_ms: 0.0,     // Not yet tracked
154        })
155    }
156
157    async fn set_frequency(&self, frequency_hz: f64) -> ServiceResult<()> {
158        if frequency_hz <= 0.0 {
159            return Err(ServiceError::InvalidInput(
160                "Frequency must be greater than 0".to_string(),
161            ));
162        }
163
164        info!(target: "feagi-services", "Setting burst frequency to {} Hz", frequency_hz);
165
166        let mut runner = self.burst_runner.write();
167        runner.set_frequency(frequency_hz);
168
169        Ok(())
170    }
171
172    async fn get_burst_count(&self) -> ServiceResult<u64> {
173        let runner = self.burst_runner.read();
174        Ok(runner.get_burst_count())
175    }
176
177    async fn reset_burst_count(&self) -> ServiceResult<()> {
178        info!(target: "feagi-services", "Resetting burst count");
179
180        // TODO: Implement burst count reset in BurstLoopRunner
181        warn!(target: "feagi-services", "Burst count reset not yet implemented in BurstLoopRunner");
182
183        Err(ServiceError::NotImplemented(
184            "Burst count reset not yet implemented".to_string(),
185        ))
186    }
187
188    async fn register_motor_subscriptions(
189        &self,
190        agent_id: &str,
191        cortical_ids: Vec<String>,
192        rate_hz: f64,
193    ) -> ServiceResult<()> {
194        if rate_hz <= 0.0 {
195            return Err(ServiceError::InvalidInput(
196                "Motor rate must be greater than 0".to_string(),
197            ));
198        }
199
200        let cortical_set: AHashSet<String> = cortical_ids.into_iter().collect();
201        let runner = self.burst_runner.read();
202        runner
203            .register_motor_subscriptions_with_rate(agent_id.to_string(), cortical_set, rate_hz)
204            .map_err(|e| ServiceError::InvalidInput(e.to_string()))
205    }
206
207    async fn register_visualization_subscriptions(
208        &self,
209        agent_id: &str,
210        rate_hz: f64,
211    ) -> ServiceResult<()> {
212        if rate_hz <= 0.0 {
213            return Err(ServiceError::InvalidInput(
214                "Visualization rate must be greater than 0".to_string(),
215            ));
216        }
217
218        let runner = self.burst_runner.read();
219        runner
220            .register_visualization_subscriptions_with_rate(agent_id.to_string(), rate_hz)
221            .map_err(|e| ServiceError::InvalidInput(e.to_string()))
222    }
223
224    fn unregister_motor_subscriptions(&self, agent_id: &str) {
225        let runner = self.burst_runner.read();
226        runner.unregister_motor_subscriptions(agent_id);
227    }
228
229    fn unregister_visualization_subscriptions(&self, agent_id: &str) {
230        let runner = self.burst_runner.read();
231        runner.unregister_visualization_subscriptions(agent_id);
232    }
233
234    fn clear_all_motor_subscriptions(&self) {
235        let runner = self.burst_runner.read();
236        runner.clear_all_motor_subscriptions();
237    }
238
239    fn clear_all_visualization_subscriptions(&self) {
240        let runner = self.burst_runner.read();
241        runner.clear_all_visualization_subscriptions();
242    }
243
244    async fn get_fcl_snapshot(&self) -> ServiceResult<Vec<(u64, f32)>> {
245        let runner = self.burst_runner.read();
246        let fcl_data = runner.get_fcl_snapshot();
247
248        // Convert NeuronId (u32) to u64
249        let result = fcl_data
250            .iter()
251            .map(|(neuron_id, potential)| (neuron_id.0 as u64, *potential))
252            .collect();
253
254        Ok(result)
255    }
256
257    async fn get_fcl_snapshot_with_cortical_idx(&self) -> ServiceResult<Vec<(u64, u32, f32)>> {
258        let runner = self.burst_runner.read();
259        let npu = runner.get_npu();
260
261        // CRITICAL: Acquire lock ONCE and do BOTH operations (FCL snapshot + cortical_idx lookup)
262        // Previous code acquired lock twice: once for get_fcl_snapshot(), once for cortical_idx
263        let lock_start = std::time::Instant::now();
264        let thread_id = std::thread::current().id();
265        debug!("[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} attempting NPU lock for get_fcl_snapshot_with_cortical_idx at {:?}", thread_id, lock_start);
266        let result: Vec<(u64, u32, f32)> = {
267            // Acquire lock ONCE for both FCL snapshot and cortical_idx lookup
268            let npu_lock = npu.lock().unwrap();
269            let lock_acquired = std::time::Instant::now();
270            let lock_wait = lock_acquired.duration_since(lock_start);
271            debug!(
272                "[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} acquired lock after {:.2}ms wait for get_fcl_snapshot_with_cortical_idx",
273                thread_id,
274                lock_wait.as_secs_f64() * 1000.0
275            );
276
277            // STRICT: Resolve cortical_idx without fallbacks (memory neurons are handled explicitly).
278            let fcl_data = npu_lock
279                .get_last_fcl_snapshot_with_cortical_idx()
280                .map_err(|e| {
281                    ServiceError::Internal(format!("Failed to resolve FCL cortical_idx: {e}"))
282                })?;
283            debug!(
284                "[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} got FCL snapshot ({} neurons) with cortical_idx",
285                thread_id,
286                fcl_data.len()
287            );
288
289            fcl_data
290                .into_iter()
291                .map(|(neuron_id, cortical_idx, potential)| {
292                    (neuron_id.0 as u64, cortical_idx, potential)
293                })
294                .collect()
295        }; // Lock released here
296        let lock_released = std::time::Instant::now();
297        let _lock_hold_duration = lock_released.duration_since(lock_start);
298        debug!("[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} RELEASED NPU lock after get_fcl_snapshot_with_cortical_idx (total: {:.2}ms wait + {:.2}ms hold, {} neurons)", 
299            thread_id,
300            lock_released.duration_since(lock_start).as_secs_f64() * 1000.0,
301            lock_released.duration_since(lock_start).as_secs_f64() * 1000.0,
302            result.len());
303
304        Ok(result)
305    }
306
307    async fn get_fire_queue_sample(
308        &self,
309    ) -> ServiceResult<
310        std::collections::HashMap<u32, (Vec<u32>, Vec<u32>, Vec<u32>, Vec<u32>, Vec<f32>)>,
311    > {
312        let mut runner = self.burst_runner.write();
313
314        match runner.get_fire_queue_sample() {
315            Some(sample) => {
316                // Convert AHashMap to std::HashMap for service layer compatibility
317                let result: std::collections::HashMap<_, _> = sample.into_iter().collect();
318                Ok(result)
319            }
320            None => Ok(std::collections::HashMap::new()),
321        }
322    }
323
324    async fn get_fire_ledger_configs(&self) -> ServiceResult<Vec<(u32, usize)>> {
325        debug!("[NPU-LOCK] RUNTIME-SERVICE: get_fire_ledger_configs() called - this acquires NPU lock!");
326        let runner = self.burst_runner.read();
327        let configs = runner.get_fire_ledger_configs();
328        Ok(configs)
329    }
330
331    async fn configure_fire_ledger_window(
332        &self,
333        cortical_idx: u32,
334        window_size: usize,
335    ) -> ServiceResult<()> {
336        let mut runner = self.burst_runner.write();
337        runner
338            .configure_fire_ledger_window(cortical_idx, window_size)
339            .map_err(|e| {
340                ServiceError::Internal(format!("Failed to configure fire ledger window: {e}"))
341            })?;
342
343        info!(target: "feagi-services", "Configured Fire Ledger window for area {}: {} bursts",
344            cortical_idx, window_size);
345
346        Ok(())
347    }
348
349    async fn get_fcl_sampler_config(&self) -> ServiceResult<(f64, u32)> {
350        let runner = self.burst_runner.read();
351        Ok(runner.get_fcl_sampler_config())
352    }
353
354    async fn set_fcl_sampler_config(
355        &self,
356        frequency: Option<f64>,
357        consumer: Option<u32>,
358    ) -> ServiceResult<()> {
359        let runner = self.burst_runner.read();
360        runner.set_fcl_sampler_config(frequency, consumer);
361        Ok(())
362    }
363
364    async fn get_area_fcl_sample_rate(&self, area_id: u32) -> ServiceResult<f64> {
365        let runner = self.burst_runner.read();
366        Ok(runner.get_area_fcl_sample_rate(area_id))
367    }
368
369    async fn set_area_fcl_sample_rate(&self, area_id: u32, sample_rate: f64) -> ServiceResult<()> {
370        if sample_rate <= 0.0 || sample_rate > 1000.0 {
371            return Err(ServiceError::InvalidInput(
372                "Sample rate must be between 0 and 1000 Hz".to_string(),
373            ));
374        }
375
376        let runner = self.burst_runner.read();
377        runner.set_area_fcl_sample_rate(area_id, sample_rate);
378
379        info!(target: "feagi-services", "Set FCL sample rate for area {} to {}Hz", area_id, sample_rate);
380        Ok(())
381    }
382
383    async fn inject_sensory_by_coordinates(
384        &self,
385        cortical_id: &str,
386        xyzp_data: &[(u32, u32, u32, f32)],
387        mode: ManualStimulationMode,
388    ) -> ServiceResult<usize> {
389        // Parse cortical ID from base64 string
390        let cortical_id_typed = CorticalID::try_from_base_64(cortical_id).map_err(|e| {
391            ServiceError::InvalidInput(format!("Invalid cortical ID format: {}", e))
392        })?;
393
394        // Get NPU from burst runner
395        let runner = self.burst_runner.read();
396        let npu = runner.get_npu();
397
398        // Inject using NPU's service layer method
399        let lock_start = std::time::Instant::now();
400        debug!(
401            "[NPU-LOCK] RUNTIME-SERVICE: Acquiring lock for manual stimulation ({} coordinates) - THIS CAN BLOCK BURST LOOP!",
402            xyzp_data.len()
403        );
404        let injected_count = {
405            let mut npu_lock = npu
406                .lock()
407                .map_err(|e| ServiceError::Backend(format!("Failed to lock NPU: {}", e)))?;
408            let lock_wait = lock_start.elapsed();
409            debug!(
410                "[NPU-LOCK] RUNTIME-SERVICE: Lock acquired for manual stimulation (waited {:.2}ms)",
411                lock_wait.as_secs_f64() * 1000.0
412            );
413            let result = match mode {
414                ManualStimulationMode::Candidate => {
415                    npu_lock.inject_sensory_xyzp_by_id(&cortical_id_typed, xyzp_data)
416                }
417                ManualStimulationMode::ForceFire => {
418                    npu_lock.inject_force_fire_by_coordinates(&cortical_id_typed, xyzp_data)
419                }
420            };
421            let lock_hold_duration = lock_start.elapsed();
422            debug!(
423                "[NPU-LOCK] RUNTIME-SERVICE: Releasing lock after manual stimulation (held for {:.2}ms)",
424                lock_hold_duration.as_secs_f64() * 1000.0
425            );
426            result
427        };
428
429        if injected_count == 0 && !xyzp_data.is_empty() {
430            warn!(target: "feagi-services",
431                "No neurons found for injection: cortical_id={}, coordinates={}",
432                cortical_id, xyzp_data.len());
433        }
434
435        Ok(injected_count)
436    }
437}