Skip to main content

feagi_services/impls/
runtime_service_impl.rs

1// Copyright 2025 Neuraville Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4/*!
5Runtime control service implementation.
6
7Provides control over the FEAGI burst engine runtime.
8
9Copyright 2025 Neuraville Inc.
10Licensed under the Apache License, Version 2.0
11*/
12
13use std::sync::Arc;
14
15use ahash::AHashSet;
16use async_trait::async_trait;
17use feagi_npu_burst_engine::BurstLoopRunner;
18use feagi_structures::genomic::cortical_area::CorticalID;
19use parking_lot::RwLock;
20use tracing::{debug, info, warn};
21
22use crate::traits::runtime_service::ManualStimulationMode;
23use crate::traits::RuntimeService;
24use crate::types::{RuntimeStatus, ServiceError, ServiceResult};
25
26/// Default implementation of RuntimeService
27///
28/// Wraps the BurstLoopRunner and provides async interface for runtime control.
29pub struct RuntimeServiceImpl {
30    burst_runner: Arc<RwLock<BurstLoopRunner>>,
31    paused: Arc<RwLock<bool>>,
32}
33
34impl RuntimeServiceImpl {
35    /// Create a new RuntimeServiceImpl
36    pub fn new(burst_runner: Arc<RwLock<BurstLoopRunner>>) -> Self {
37        Self {
38            burst_runner,
39            paused: Arc::new(RwLock::new(false)),
40        }
41    }
42}
43
44#[async_trait]
45impl RuntimeService for RuntimeServiceImpl {
46    async fn start(&self) -> ServiceResult<()> {
47        info!(target: "feagi-services", "Starting burst engine");
48
49        let mut runner = self.burst_runner.write();
50
51        runner
52            .start()
53            .map_err(|e| ServiceError::InvalidState(e.to_string()))?;
54
55        // Clear paused flag
56        *self.paused.write() = false;
57
58        Ok(())
59    }
60
61    async fn stop(&self) -> ServiceResult<()> {
62        info!(target: "feagi-services", "Stopping burst engine");
63
64        let mut runner = self.burst_runner.write();
65        runner.stop();
66
67        // Clear paused flag
68        *self.paused.write() = false;
69
70        Ok(())
71    }
72
73    async fn pause(&self) -> ServiceResult<()> {
74        info!(target: "feagi-services", "Pausing burst engine");
75
76        let runner = self.burst_runner.read();
77        if !runner.is_running() {
78            return Err(ServiceError::InvalidState(
79                "Burst engine is not running".to_string(),
80            ));
81        }
82
83        // Set paused flag (actual pause implementation depends on burst loop design)
84        *self.paused.write() = true;
85
86        // TODO: Implement actual pause mechanism in BurstLoopRunner
87        // For now, we just track the paused state
88        warn!(target: "feagi-services", "Pause not yet implemented in BurstLoopRunner - using flag only");
89
90        Ok(())
91    }
92
93    async fn resume(&self) -> ServiceResult<()> {
94        info!(target: "feagi-services", "Resuming burst engine");
95
96        let paused = *self.paused.read();
97        if !paused {
98            return Err(ServiceError::InvalidState(
99                "Burst engine is not paused".to_string(),
100            ));
101        }
102
103        // Clear paused flag
104        *self.paused.write() = false;
105
106        // TODO: Implement actual resume mechanism in BurstLoopRunner
107        warn!(target: "feagi-services", "Resume not yet implemented in BurstLoopRunner - using flag only");
108
109        Ok(())
110    }
111
112    async fn step(&self) -> ServiceResult<()> {
113        info!(target: "feagi-services", "Executing single burst step");
114
115        let runner = self.burst_runner.read();
116        if runner.is_running() {
117            return Err(ServiceError::InvalidState(
118                "Cannot step while burst engine is running in continuous mode".to_string(),
119            ));
120        }
121
122        // TODO: Implement single-step execution in BurstLoopRunner
123        warn!(target: "feagi-services", "Single-step execution not yet implemented in BurstLoopRunner");
124
125        Err(ServiceError::NotImplemented(
126            "Single-step execution not yet implemented".to_string(),
127        ))
128    }
129
130    async fn get_status(&self) -> ServiceResult<RuntimeStatus> {
131        let runner = self.burst_runner.read();
132        let is_running = runner.is_running();
133        let burst_count = runner.get_burst_count();
134        let is_paused = *self.paused.read();
135
136        // Note: Some metrics not yet available from BurstLoopRunner
137        // - current_rate_hz: Would require tracking actual execution rate
138        // - last_burst_neuron_count: Not tracked by BurstLoopRunner
139        // - avg_burst_time_ms: Not tracked by BurstLoopRunner
140        Ok(RuntimeStatus {
141            is_running,
142            is_paused,
143            frequency_hz: runner.get_frequency(),
144            burst_count,
145            current_rate_hz: if is_running {
146                runner.get_frequency()
147            } else {
148                0.0
149            },
150            last_burst_neuron_count: 0, // Not yet tracked
151            avg_burst_time_ms: 0.0,     // Not yet tracked
152        })
153    }
154
155    async fn set_frequency(&self, frequency_hz: f64) -> ServiceResult<()> {
156        if frequency_hz <= 0.0 {
157            return Err(ServiceError::InvalidInput(
158                "Frequency must be greater than 0".to_string(),
159            ));
160        }
161
162        info!(target: "feagi-services", "Setting burst frequency to {} Hz", frequency_hz);
163
164        let mut runner = self.burst_runner.write();
165        runner.set_frequency(frequency_hz);
166
167        Ok(())
168    }
169
170    async fn get_burst_count(&self) -> ServiceResult<u64> {
171        let runner = self.burst_runner.read();
172        Ok(runner.get_burst_count())
173    }
174
175    async fn reset_burst_count(&self) -> ServiceResult<()> {
176        info!(target: "feagi-services", "Resetting burst count");
177
178        // TODO: Implement burst count reset in BurstLoopRunner
179        warn!(target: "feagi-services", "Burst count reset not yet implemented in BurstLoopRunner");
180
181        Err(ServiceError::NotImplemented(
182            "Burst count reset not yet implemented".to_string(),
183        ))
184    }
185
186    async fn register_motor_subscriptions(
187        &self,
188        agent_id: &str,
189        cortical_ids: Vec<String>,
190        rate_hz: f64,
191    ) -> ServiceResult<()> {
192        if rate_hz <= 0.0 {
193            return Err(ServiceError::InvalidInput(
194                "Motor rate must be greater than 0".to_string(),
195            ));
196        }
197
198        let cortical_set: AHashSet<String> = cortical_ids.into_iter().collect();
199        let runner = self.burst_runner.read();
200        runner
201            .register_motor_subscriptions_with_rate(agent_id.to_string(), cortical_set, rate_hz)
202            .map_err(|e| ServiceError::InvalidInput(e.to_string()))
203    }
204
205    async fn register_visualization_subscriptions(
206        &self,
207        agent_id: &str,
208        rate_hz: f64,
209    ) -> ServiceResult<()> {
210        if rate_hz <= 0.0 {
211            return Err(ServiceError::InvalidInput(
212                "Visualization rate must be greater than 0".to_string(),
213            ));
214        }
215
216        let runner = self.burst_runner.read();
217        runner
218            .register_visualization_subscriptions_with_rate(agent_id.to_string(), rate_hz)
219            .map_err(|e| ServiceError::InvalidInput(e.to_string()))
220    }
221
222    fn unregister_motor_subscriptions(&self, agent_id: &str) {
223        let runner = self.burst_runner.read();
224        runner.unregister_motor_subscriptions(agent_id);
225    }
226
227    fn unregister_visualization_subscriptions(&self, agent_id: &str) {
228        let runner = self.burst_runner.read();
229        runner.unregister_visualization_subscriptions(agent_id);
230    }
231
232    async fn get_fcl_snapshot(&self) -> ServiceResult<Vec<(u64, f32)>> {
233        let runner = self.burst_runner.read();
234        let fcl_data = runner.get_fcl_snapshot();
235
236        // Convert NeuronId (u32) to u64
237        let result = fcl_data
238            .iter()
239            .map(|(neuron_id, potential)| (neuron_id.0 as u64, *potential))
240            .collect();
241
242        Ok(result)
243    }
244
245    async fn get_fcl_snapshot_with_cortical_idx(&self) -> ServiceResult<Vec<(u64, u32, f32)>> {
246        let runner = self.burst_runner.read();
247        let npu = runner.get_npu();
248
249        // CRITICAL: Acquire lock ONCE and do BOTH operations (FCL snapshot + cortical_idx lookup)
250        // Previous code acquired lock twice: once for get_fcl_snapshot(), once for cortical_idx
251        let lock_start = std::time::Instant::now();
252        let thread_id = std::thread::current().id();
253        debug!("[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} attempting NPU lock for get_fcl_snapshot_with_cortical_idx at {:?}", thread_id, lock_start);
254        let result: Vec<(u64, u32, f32)> = {
255            // Acquire lock ONCE for both FCL snapshot and cortical_idx lookup
256            let npu_lock = npu.lock().unwrap();
257            let lock_acquired = std::time::Instant::now();
258            let lock_wait = lock_acquired.duration_since(lock_start);
259            debug!(
260                "[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} acquired lock after {:.2}ms wait for get_fcl_snapshot_with_cortical_idx",
261                thread_id,
262                lock_wait.as_secs_f64() * 1000.0
263            );
264
265            // STRICT: Resolve cortical_idx without fallbacks (memory neurons are handled explicitly).
266            let fcl_data = npu_lock
267                .get_last_fcl_snapshot_with_cortical_idx()
268                .map_err(|e| {
269                    ServiceError::Internal(format!("Failed to resolve FCL cortical_idx: {e}"))
270                })?;
271            debug!(
272                "[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} got FCL snapshot ({} neurons) with cortical_idx",
273                thread_id,
274                fcl_data.len()
275            );
276
277            fcl_data
278                .into_iter()
279                .map(|(neuron_id, cortical_idx, potential)| {
280                    (neuron_id.0 as u64, cortical_idx, potential)
281                })
282                .collect()
283        }; // Lock released here
284        let lock_released = std::time::Instant::now();
285        let _lock_hold_duration = lock_released.duration_since(lock_start);
286        debug!("[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} RELEASED NPU lock after get_fcl_snapshot_with_cortical_idx (total: {:.2}ms wait + {:.2}ms hold, {} neurons)", 
287            thread_id,
288            lock_released.duration_since(lock_start).as_secs_f64() * 1000.0,
289            lock_released.duration_since(lock_start).as_secs_f64() * 1000.0,
290            result.len());
291
292        Ok(result)
293    }
294
295    async fn get_fire_queue_sample(
296        &self,
297    ) -> ServiceResult<
298        std::collections::HashMap<u32, (Vec<u32>, Vec<u32>, Vec<u32>, Vec<u32>, Vec<f32>)>,
299    > {
300        let mut runner = self.burst_runner.write();
301
302        match runner.get_fire_queue_sample() {
303            Some(sample) => {
304                // Convert AHashMap to std::HashMap for service layer compatibility
305                let result: std::collections::HashMap<_, _> = sample.into_iter().collect();
306                Ok(result)
307            }
308            None => Ok(std::collections::HashMap::new()),
309        }
310    }
311
312    async fn get_fire_ledger_configs(&self) -> ServiceResult<Vec<(u32, usize)>> {
313        debug!("[NPU-LOCK] RUNTIME-SERVICE: get_fire_ledger_configs() called - this acquires NPU lock!");
314        let runner = self.burst_runner.read();
315        let configs = runner.get_fire_ledger_configs();
316        Ok(configs)
317    }
318
319    async fn configure_fire_ledger_window(
320        &self,
321        cortical_idx: u32,
322        window_size: usize,
323    ) -> ServiceResult<()> {
324        let mut runner = self.burst_runner.write();
325        runner
326            .configure_fire_ledger_window(cortical_idx, window_size)
327            .map_err(|e| {
328                ServiceError::Internal(format!("Failed to configure fire ledger window: {e}"))
329            })?;
330
331        info!(target: "feagi-services", "Configured Fire Ledger window for area {}: {} bursts",
332            cortical_idx, window_size);
333
334        Ok(())
335    }
336
337    async fn get_fcl_sampler_config(&self) -> ServiceResult<(f64, u32)> {
338        let runner = self.burst_runner.read();
339        Ok(runner.get_fcl_sampler_config())
340    }
341
342    async fn set_fcl_sampler_config(
343        &self,
344        frequency: Option<f64>,
345        consumer: Option<u32>,
346    ) -> ServiceResult<()> {
347        let runner = self.burst_runner.read();
348        runner.set_fcl_sampler_config(frequency, consumer);
349        Ok(())
350    }
351
352    async fn get_area_fcl_sample_rate(&self, area_id: u32) -> ServiceResult<f64> {
353        let runner = self.burst_runner.read();
354        Ok(runner.get_area_fcl_sample_rate(area_id))
355    }
356
357    async fn set_area_fcl_sample_rate(&self, area_id: u32, sample_rate: f64) -> ServiceResult<()> {
358        if sample_rate <= 0.0 || sample_rate > 1000.0 {
359            return Err(ServiceError::InvalidInput(
360                "Sample rate must be between 0 and 1000 Hz".to_string(),
361            ));
362        }
363
364        let runner = self.burst_runner.read();
365        runner.set_area_fcl_sample_rate(area_id, sample_rate);
366
367        info!(target: "feagi-services", "Set FCL sample rate for area {} to {}Hz", area_id, sample_rate);
368        Ok(())
369    }
370
371    async fn inject_sensory_by_coordinates(
372        &self,
373        cortical_id: &str,
374        xyzp_data: &[(u32, u32, u32, f32)],
375        mode: ManualStimulationMode,
376    ) -> ServiceResult<usize> {
377        // Parse cortical ID from base64 string
378        let cortical_id_typed = CorticalID::try_from_base_64(cortical_id).map_err(|e| {
379            ServiceError::InvalidInput(format!("Invalid cortical ID format: {}", e))
380        })?;
381
382        // Get NPU from burst runner
383        let runner = self.burst_runner.read();
384        let npu = runner.get_npu();
385
386        // Inject using NPU's service layer method
387        let lock_start = std::time::Instant::now();
388        debug!(
389            "[NPU-LOCK] RUNTIME-SERVICE: Acquiring lock for manual stimulation ({} coordinates) - THIS CAN BLOCK BURST LOOP!",
390            xyzp_data.len()
391        );
392        let injected_count = {
393            let mut npu_lock = npu
394                .lock()
395                .map_err(|e| ServiceError::Backend(format!("Failed to lock NPU: {}", e)))?;
396            let lock_wait = lock_start.elapsed();
397            debug!(
398                "[NPU-LOCK] RUNTIME-SERVICE: Lock acquired for manual stimulation (waited {:.2}ms)",
399                lock_wait.as_secs_f64() * 1000.0
400            );
401            let result = match mode {
402                ManualStimulationMode::Candidate => {
403                    npu_lock.inject_sensory_xyzp_by_id(&cortical_id_typed, xyzp_data)
404                }
405                ManualStimulationMode::ForceFire => {
406                    npu_lock.inject_force_fire_by_coordinates(&cortical_id_typed, xyzp_data)
407                }
408            };
409            let lock_hold_duration = lock_start.elapsed();
410            debug!(
411                "[NPU-LOCK] RUNTIME-SERVICE: Releasing lock after manual stimulation (held for {:.2}ms)",
412                lock_hold_duration.as_secs_f64() * 1000.0
413            );
414            result
415        };
416
417        if injected_count == 0 && !xyzp_data.is_empty() {
418            warn!(target: "feagi-services",
419                "No neurons found for injection: cortical_id={}, coordinates={}",
420                cortical_id, xyzp_data.len());
421        } else if injected_count > 0 {
422            info!(target: "feagi-services",
423                "Injected {} neurons into FCL for cortical area {}",
424                injected_count, cortical_id);
425        }
426
427        Ok(injected_count)
428    }
429}