Skip to main content

feagi_services/impls/
runtime_service_impl.rs

1// Copyright 2025 Neuraville Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4/*!
5Runtime control service implementation.
6
7Provides control over the FEAGI burst engine runtime.
8
9Copyright 2025 Neuraville Inc.
10Licensed under the Apache License, Version 2.0
11*/
12
13use std::sync::Arc;
14
15use ahash::AHashSet;
16use async_trait::async_trait;
17use feagi_npu_burst_engine::BurstLoopRunner;
18use feagi_structures::genomic::cortical_area::CorticalID;
19use parking_lot::RwLock;
20use tracing::{debug, info, warn};
21
22use crate::traits::runtime_service::ManualStimulationMode;
23use crate::traits::RuntimeService;
24use crate::types::{RuntimeStatus, ServiceError, ServiceResult};
25
26/// Default implementation of RuntimeService
27///
28/// Wraps the BurstLoopRunner and provides async interface for runtime control.
29pub struct RuntimeServiceImpl {
30    burst_runner: Arc<RwLock<BurstLoopRunner>>,
31    paused: Arc<RwLock<bool>>,
32    plasticity_service: Option<Arc<dyn std::any::Any + Send + Sync>>,
33}
34
35impl RuntimeServiceImpl {
36    /// Create a new RuntimeServiceImpl
37    pub fn new(burst_runner: Arc<RwLock<BurstLoopRunner>>) -> Self {
38        Self {
39            burst_runner,
40            paused: Arc::new(RwLock::new(false)),
41            plasticity_service: None,
42        }
43    }
44
45    /// Create a new RuntimeServiceImpl with plasticity support
46    #[cfg(feature = "plasticity")]
47    pub fn new_with_plasticity(
48        burst_runner: Arc<RwLock<BurstLoopRunner>>,
49        plasticity_service: Arc<feagi_npu_plasticity::PlasticityService>,
50    ) -> Self {
51        Self {
52            burst_runner,
53            paused: Arc::new(RwLock::new(false)),
54            plasticity_service: Some(plasticity_service as Arc<dyn std::any::Any + Send + Sync>),
55        }
56    }
57
58    /// Create a new RuntimeServiceImpl with plasticity support (stub for no_plasticity builds)
59    #[cfg(not(feature = "plasticity"))]
60    pub fn new_with_plasticity(
61        burst_runner: Arc<RwLock<BurstLoopRunner>>,
62        _plasticity_service: Arc<dyn std::any::Any + Send + Sync>,
63    ) -> Self {
64        Self {
65            burst_runner,
66            paused: Arc::new(RwLock::new(false)),
67            plasticity_service: None,
68        }
69    }
70}
71
72#[async_trait]
73impl RuntimeService for RuntimeServiceImpl {
74    async fn start(&self) -> ServiceResult<()> {
75        info!(target: "feagi-services", "Starting burst engine");
76
77        let mut runner = self.burst_runner.write();
78
79        runner
80            .start()
81            .map_err(|e| ServiceError::InvalidState(e.to_string()))?;
82
83        // Clear paused flag
84        *self.paused.write() = false;
85
86        Ok(())
87    }
88
89    async fn stop(&self) -> ServiceResult<()> {
90        info!(target: "feagi-services", "Stopping burst engine");
91
92        let mut runner = self.burst_runner.write();
93        runner
94            .stop_strict()
95            .map_err(|e| ServiceError::Backend(format!("Failed to stop burst engine: {}", e)))?;
96
97        // Clear paused flag
98        *self.paused.write() = false;
99
100        Ok(())
101    }
102
103    async fn pause(&self) -> ServiceResult<()> {
104        info!(target: "feagi-services", "Pausing burst engine");
105
106        let runner = self.burst_runner.read();
107        if !runner.is_running() {
108            return Err(ServiceError::InvalidState(
109                "Burst engine is not running".to_string(),
110            ));
111        }
112
113        // Set paused flag (actual pause implementation depends on burst loop design)
114        *self.paused.write() = true;
115
116        // TODO: Implement actual pause mechanism in BurstLoopRunner
117        // For now, we just track the paused state
118        warn!(target: "feagi-services", "Pause not yet implemented in BurstLoopRunner - using flag only");
119
120        Ok(())
121    }
122
123    async fn resume(&self) -> ServiceResult<()> {
124        info!(target: "feagi-services", "Resuming burst engine");
125
126        let paused = *self.paused.read();
127        if !paused {
128            return Err(ServiceError::InvalidState(
129                "Burst engine is not paused".to_string(),
130            ));
131        }
132
133        // Clear paused flag
134        *self.paused.write() = false;
135
136        // TODO: Implement actual resume mechanism in BurstLoopRunner
137        warn!(target: "feagi-services", "Resume not yet implemented in BurstLoopRunner - using flag only");
138
139        Ok(())
140    }
141
142    async fn step(&self) -> ServiceResult<()> {
143        info!(target: "feagi-services", "Executing single burst step");
144
145        let runner = self.burst_runner.read();
146        if runner.is_running() {
147            return Err(ServiceError::InvalidState(
148                "Cannot step while burst engine is running in continuous mode".to_string(),
149            ));
150        }
151
152        // TODO: Implement single-step execution in BurstLoopRunner
153        warn!(target: "feagi-services", "Single-step execution not yet implemented in BurstLoopRunner");
154
155        Err(ServiceError::NotImplemented(
156            "Single-step execution not yet implemented".to_string(),
157        ))
158    }
159
160    async fn get_status(&self) -> ServiceResult<RuntimeStatus> {
161        let runner = self.burst_runner.read();
162        let is_running = runner.is_running();
163        let burst_count = runner.get_burst_count();
164        let is_paused = *self.paused.read();
165
166        // Note: Some metrics not yet available from BurstLoopRunner
167        // - current_rate_hz: Would require tracking actual execution rate
168        // - last_burst_neuron_count: Not tracked by BurstLoopRunner
169        // - avg_burst_time_ms: Not tracked by BurstLoopRunner
170        Ok(RuntimeStatus {
171            is_running,
172            is_paused,
173            frequency_hz: runner.get_frequency(),
174            burst_count,
175            current_rate_hz: if is_running {
176                runner.get_frequency()
177            } else {
178                0.0
179            },
180            last_burst_neuron_count: 0, // Not yet tracked
181            avg_burst_time_ms: 0.0,     // Not yet tracked
182        })
183    }
184
185    async fn set_frequency(&self, frequency_hz: f64) -> ServiceResult<()> {
186        if frequency_hz <= 0.0 {
187            return Err(ServiceError::InvalidInput(
188                "Frequency must be greater than 0".to_string(),
189            ));
190        }
191
192        info!(target: "feagi-services", "Setting burst frequency to {} Hz", frequency_hz);
193
194        let mut runner = self.burst_runner.write();
195        runner.set_frequency(frequency_hz);
196
197        Ok(())
198    }
199
200    async fn get_burst_count(&self) -> ServiceResult<u64> {
201        let runner = self.burst_runner.read();
202        Ok(runner.get_burst_count())
203    }
204
205    async fn reset_burst_count(&self) -> ServiceResult<()> {
206        info!(target: "feagi-services", "Resetting burst count");
207
208        // TODO: Implement burst count reset in BurstLoopRunner
209        warn!(target: "feagi-services", "Burst count reset not yet implemented in BurstLoopRunner");
210
211        Err(ServiceError::NotImplemented(
212            "Burst count reset not yet implemented".to_string(),
213        ))
214    }
215
216    async fn register_motor_subscriptions(
217        &self,
218        agent_id: &str,
219        cortical_ids: Vec<String>,
220        rate_hz: f64,
221    ) -> ServiceResult<()> {
222        if rate_hz <= 0.0 {
223            return Err(ServiceError::InvalidInput(
224                "Motor rate must be greater than 0".to_string(),
225            ));
226        }
227
228        let cortical_set: AHashSet<String> = cortical_ids.into_iter().collect();
229        let runner = self.burst_runner.read();
230        runner
231            .register_motor_subscriptions_with_rate(agent_id.to_string(), cortical_set, rate_hz)
232            .map_err(|e| ServiceError::InvalidInput(e.to_string()))
233    }
234
235    async fn register_visualization_subscriptions(
236        &self,
237        agent_id: &str,
238        rate_hz: f64,
239    ) -> ServiceResult<()> {
240        if rate_hz <= 0.0 {
241            return Err(ServiceError::InvalidInput(
242                "Visualization rate must be greater than 0".to_string(),
243            ));
244        }
245
246        let runner = self.burst_runner.read();
247        runner
248            .register_visualization_subscriptions_with_rate(agent_id.to_string(), rate_hz)
249            .map_err(|e| ServiceError::InvalidInput(e.to_string()))
250    }
251
252    fn unregister_motor_subscriptions(&self, agent_id: &str) {
253        let runner = self.burst_runner.read();
254        runner.unregister_motor_subscriptions(agent_id);
255    }
256
257    fn unregister_visualization_subscriptions(&self, agent_id: &str) {
258        let runner = self.burst_runner.read();
259        runner.unregister_visualization_subscriptions(agent_id);
260    }
261
262    fn clear_all_motor_subscriptions(&self) {
263        let runner = self.burst_runner.read();
264        runner.clear_all_motor_subscriptions();
265    }
266
267    fn clear_all_visualization_subscriptions(&self) {
268        let runner = self.burst_runner.read();
269        runner.clear_all_visualization_subscriptions();
270    }
271
272    async fn get_fcl_snapshot(&self) -> ServiceResult<Vec<(u64, f32)>> {
273        let runner = self.burst_runner.read();
274        let fcl_data = runner.get_fcl_snapshot();
275
276        // Convert NeuronId (u32) to u64
277        let result = fcl_data
278            .iter()
279            .map(|(neuron_id, potential)| (neuron_id.0 as u64, *potential))
280            .collect();
281
282        Ok(result)
283    }
284
285    async fn get_fcl_snapshot_with_cortical_idx(&self) -> ServiceResult<Vec<(u64, u32, f32)>> {
286        let runner = self.burst_runner.read();
287        let npu = runner.get_npu();
288
289        // CRITICAL: Acquire lock ONCE and do BOTH operations (FCL snapshot + cortical_idx lookup)
290        // Previous code acquired lock twice: once for get_fcl_snapshot(), once for cortical_idx
291        let lock_start = std::time::Instant::now();
292        let thread_id = std::thread::current().id();
293        debug!("[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} attempting NPU lock for get_fcl_snapshot_with_cortical_idx at {:?}", thread_id, lock_start);
294        let result: Vec<(u64, u32, f32)> = {
295            // Acquire lock ONCE for both FCL snapshot and cortical_idx lookup
296            let npu_lock = npu.lock().unwrap();
297            let lock_acquired = std::time::Instant::now();
298            let lock_wait = lock_acquired.duration_since(lock_start);
299            debug!(
300                "[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} acquired lock after {:.2}ms wait for get_fcl_snapshot_with_cortical_idx",
301                thread_id,
302                lock_wait.as_secs_f64() * 1000.0
303            );
304
305            // STRICT: Resolve cortical_idx without fallbacks (memory neurons are handled explicitly).
306            let fcl_data = npu_lock
307                .get_last_fcl_snapshot_with_cortical_idx()
308                .map_err(|e| {
309                    ServiceError::Internal(format!("Failed to resolve FCL cortical_idx: {e}"))
310                })?;
311            debug!(
312                "[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} got FCL snapshot ({} neurons) with cortical_idx",
313                thread_id,
314                fcl_data.len()
315            );
316
317            fcl_data
318                .into_iter()
319                .map(|(neuron_id, cortical_idx, potential)| {
320                    (neuron_id.0 as u64, cortical_idx, potential)
321                })
322                .collect()
323        }; // Lock released here
324        let lock_released = std::time::Instant::now();
325        let _lock_hold_duration = lock_released.duration_since(lock_start);
326        debug!("[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} RELEASED NPU lock after get_fcl_snapshot_with_cortical_idx (total: {:.2}ms wait + {:.2}ms hold, {} neurons)", 
327            thread_id,
328            lock_released.duration_since(lock_start).as_secs_f64() * 1000.0,
329            lock_released.duration_since(lock_start).as_secs_f64() * 1000.0,
330            result.len());
331
332        Ok(result)
333    }
334
335    async fn get_fire_queue_sample(
336        &self,
337    ) -> ServiceResult<
338        std::collections::HashMap<u32, (Vec<u32>, Vec<u32>, Vec<u32>, Vec<u32>, Vec<f32>)>,
339    > {
340        let mut runner = self.burst_runner.write();
341
342        match runner.get_fire_queue_sample() {
343            Some(sample) => {
344                // Convert AHashMap to std::HashMap for service layer compatibility
345                let result: std::collections::HashMap<_, _> = sample.into_iter().collect();
346                Ok(result)
347            }
348            None => Ok(std::collections::HashMap::new()),
349        }
350    }
351
352    async fn get_fire_ledger_configs(&self) -> ServiceResult<Vec<(u32, usize)>> {
353        debug!("[NPU-LOCK] RUNTIME-SERVICE: get_fire_ledger_configs() called - this acquires NPU lock!");
354        let runner = self.burst_runner.read();
355        let configs = runner.get_fire_ledger_configs();
356        Ok(configs)
357    }
358
359    async fn configure_fire_ledger_window(
360        &self,
361        cortical_idx: u32,
362        window_size: usize,
363    ) -> ServiceResult<()> {
364        let mut runner = self.burst_runner.write();
365        runner
366            .configure_fire_ledger_window(cortical_idx, window_size)
367            .map_err(|e| {
368                ServiceError::Internal(format!("Failed to configure fire ledger window: {e}"))
369            })?;
370
371        info!(target: "feagi-services", "Configured Fire Ledger window for area {}: {} bursts",
372            cortical_idx, window_size);
373
374        Ok(())
375    }
376
377    async fn get_fcl_sampler_config(&self) -> ServiceResult<(f64, u32)> {
378        let runner = self.burst_runner.read();
379        Ok(runner.get_fcl_sampler_config())
380    }
381
382    async fn set_fcl_sampler_config(
383        &self,
384        frequency: Option<f64>,
385        consumer: Option<u32>,
386    ) -> ServiceResult<()> {
387        let runner = self.burst_runner.read();
388        runner.set_fcl_sampler_config(frequency, consumer);
389        Ok(())
390    }
391
392    async fn get_area_fcl_sample_rate(&self, area_id: u32) -> ServiceResult<f64> {
393        let runner = self.burst_runner.read();
394        Ok(runner.get_area_fcl_sample_rate(area_id))
395    }
396
397    async fn set_area_fcl_sample_rate(&self, area_id: u32, sample_rate: f64) -> ServiceResult<()> {
398        if sample_rate <= 0.0 || sample_rate > 1000.0 {
399            return Err(ServiceError::InvalidInput(
400                "Sample rate must be between 0 and 1000 Hz".to_string(),
401            ));
402        }
403
404        let runner = self.burst_runner.read();
405        runner.set_area_fcl_sample_rate(area_id, sample_rate);
406
407        info!(target: "feagi-services", "Set FCL sample rate for area {} to {}Hz", area_id, sample_rate);
408        Ok(())
409    }
410
411    async fn inject_sensory_by_coordinates(
412        &self,
413        cortical_id: &str,
414        xyzp_data: &[(u32, u32, u32, f32)],
415        mode: ManualStimulationMode,
416    ) -> ServiceResult<usize> {
417        // Parse cortical ID from base64 string
418        let cortical_id_typed = CorticalID::try_from_base_64(cortical_id).map_err(|e| {
419            ServiceError::InvalidInput(format!("Invalid cortical ID format: {}", e))
420        })?;
421
422        // Get NPU from burst runner
423        let runner = self.burst_runner.read();
424        let npu = runner.get_npu();
425
426        // Inject using NPU's service layer method
427        let lock_start = std::time::Instant::now();
428        debug!(
429            "[NPU-LOCK] RUNTIME-SERVICE: Acquiring lock for manual stimulation ({} coordinates) - THIS CAN BLOCK BURST LOOP!",
430            xyzp_data.len()
431        );
432        let injected_count = {
433            let mut npu_lock = npu
434                .lock()
435                .map_err(|e| ServiceError::Backend(format!("Failed to lock NPU: {}", e)))?;
436            let lock_wait = lock_start.elapsed();
437            debug!(
438                "[NPU-LOCK] RUNTIME-SERVICE: Lock acquired for manual stimulation (waited {:.2}ms)",
439                lock_wait.as_secs_f64() * 1000.0
440            );
441            let result = match mode {
442                ManualStimulationMode::Candidate => {
443                    npu_lock.inject_sensory_xyzp_by_id(&cortical_id_typed, xyzp_data)
444                }
445                ManualStimulationMode::ForceFire => {
446                    npu_lock.inject_force_fire_by_coordinates(&cortical_id_typed, xyzp_data)
447                }
448            };
449            let lock_hold_duration = lock_start.elapsed();
450            debug!(
451                "[NPU-LOCK] RUNTIME-SERVICE: Releasing lock after manual stimulation (held for {:.2}ms)",
452                lock_hold_duration.as_secs_f64() * 1000.0
453            );
454            result
455        };
456
457        if injected_count == 0 && !xyzp_data.is_empty() {
458            warn!(target: "feagi-services",
459                "No neurons found for injection: cortical_id={}, coordinates={}",
460                cortical_id, xyzp_data.len());
461        }
462
463        Ok(injected_count)
464    }
465
466    async fn reset_cortical_area_states(
467        &self,
468        cortical_indices: &[u32],
469    ) -> ServiceResult<Vec<(u32, usize)>> {
470        if cortical_indices.is_empty() {
471            return Ok(Vec::new());
472        }
473
474        let runner = self.burst_runner.read();
475        let npu = runner.get_npu();
476        let mut npu_lock = npu
477            .lock()
478            .map_err(|e| ServiceError::Backend(format!("Failed to lock NPU: {e}")))?;
479
480        let mut results: Vec<(u32, usize)> = Vec::with_capacity(cortical_indices.len());
481        for &idx in cortical_indices {
482            let count = npu_lock.reset_cortical_area_runtime_state(idx);
483            results.push((idx, count));
484        }
485
486        // Release NPU lock before calling plasticity service
487        drop(npu_lock);
488        drop(runner);
489
490        // Reset memory neurons if plasticity service is available
491        #[cfg(feature = "plasticity")]
492        if let Some(plasticity_any) = &self.plasticity_service {
493            if let Some(plasticity_service) =
494                plasticity_any.downcast_ref::<feagi_npu_plasticity::PlasticityService>()
495            {
496                for &idx in cortical_indices {
497                    let memory_neuron_count = plasticity_service.reset_memory_neurons_in_area(idx);
498                    if memory_neuron_count > 0 {
499                        info!(
500                            target: "feagi-services",
501                            "Reset {} memory neurons in cortical area {}",
502                            memory_neuron_count,
503                            idx
504                        );
505                    }
506                }
507            }
508        }
509
510        Ok(results)
511    }
512}