feagi_services/impls/
runtime_service_impl.rs

1// Copyright 2025 Neuraville Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4/*!
5Runtime control service implementation.
6
7Provides control over the FEAGI burst engine runtime.
8
9Copyright 2025 Neuraville Inc.
10Licensed under the Apache License, Version 2.0
11*/
12
13use std::sync::Arc;
14
15use async_trait::async_trait;
16use feagi_npu_burst_engine::BurstLoopRunner;
17use feagi_structures::genomic::cortical_area::CorticalID;
18use parking_lot::RwLock;
19use tracing::{debug, info, warn};
20
21use crate::traits::RuntimeService;
22use crate::types::{RuntimeStatus, ServiceError, ServiceResult};
23
24/// Default implementation of RuntimeService
25///
26/// Wraps the BurstLoopRunner and provides async interface for runtime control.
27pub struct RuntimeServiceImpl {
28    burst_runner: Arc<RwLock<BurstLoopRunner>>,
29    paused: Arc<RwLock<bool>>,
30}
31
32impl RuntimeServiceImpl {
33    /// Create a new RuntimeServiceImpl
34    pub fn new(burst_runner: Arc<RwLock<BurstLoopRunner>>) -> Self {
35        Self {
36            burst_runner,
37            paused: Arc::new(RwLock::new(false)),
38        }
39    }
40}
41
42#[async_trait]
43impl RuntimeService for RuntimeServiceImpl {
44    async fn start(&self) -> ServiceResult<()> {
45        info!(target: "feagi-services", "Starting burst engine");
46
47        let mut runner = self.burst_runner.write();
48
49        runner
50            .start()
51            .map_err(|e| ServiceError::InvalidState(e.to_string()))?;
52
53        // Clear paused flag
54        *self.paused.write() = false;
55
56        Ok(())
57    }
58
59    async fn stop(&self) -> ServiceResult<()> {
60        info!(target: "feagi-services", "Stopping burst engine");
61
62        let mut runner = self.burst_runner.write();
63        runner.stop();
64
65        // Clear paused flag
66        *self.paused.write() = false;
67
68        Ok(())
69    }
70
71    async fn pause(&self) -> ServiceResult<()> {
72        info!(target: "feagi-services", "Pausing burst engine");
73
74        let runner = self.burst_runner.read();
75        if !runner.is_running() {
76            return Err(ServiceError::InvalidState(
77                "Burst engine is not running".to_string(),
78            ));
79        }
80
81        // Set paused flag (actual pause implementation depends on burst loop design)
82        *self.paused.write() = true;
83
84        // TODO: Implement actual pause mechanism in BurstLoopRunner
85        // For now, we just track the paused state
86        warn!(target: "feagi-services", "Pause not yet implemented in BurstLoopRunner - using flag only");
87
88        Ok(())
89    }
90
91    async fn resume(&self) -> ServiceResult<()> {
92        info!(target: "feagi-services", "Resuming burst engine");
93
94        let paused = *self.paused.read();
95        if !paused {
96            return Err(ServiceError::InvalidState(
97                "Burst engine is not paused".to_string(),
98            ));
99        }
100
101        // Clear paused flag
102        *self.paused.write() = false;
103
104        // TODO: Implement actual resume mechanism in BurstLoopRunner
105        warn!(target: "feagi-services", "Resume not yet implemented in BurstLoopRunner - using flag only");
106
107        Ok(())
108    }
109
110    async fn step(&self) -> ServiceResult<()> {
111        info!(target: "feagi-services", "Executing single burst step");
112
113        let runner = self.burst_runner.read();
114        if runner.is_running() {
115            return Err(ServiceError::InvalidState(
116                "Cannot step while burst engine is running in continuous mode".to_string(),
117            ));
118        }
119
120        // TODO: Implement single-step execution in BurstLoopRunner
121        warn!(target: "feagi-services", "Single-step execution not yet implemented in BurstLoopRunner");
122
123        Err(ServiceError::NotImplemented(
124            "Single-step execution not yet implemented".to_string(),
125        ))
126    }
127
128    async fn get_status(&self) -> ServiceResult<RuntimeStatus> {
129        let runner = self.burst_runner.read();
130        let is_running = runner.is_running();
131        let burst_count = runner.get_burst_count();
132        let is_paused = *self.paused.read();
133
134        // Note: Some metrics not yet available from BurstLoopRunner
135        // - current_rate_hz: Would require tracking actual execution rate
136        // - last_burst_neuron_count: Not tracked by BurstLoopRunner
137        // - avg_burst_time_ms: Not tracked by BurstLoopRunner
138        Ok(RuntimeStatus {
139            is_running,
140            is_paused,
141            frequency_hz: runner.get_frequency(),
142            burst_count,
143            current_rate_hz: if is_running {
144                runner.get_frequency()
145            } else {
146                0.0
147            },
148            last_burst_neuron_count: 0, // Not yet tracked
149            avg_burst_time_ms: 0.0,     // Not yet tracked
150        })
151    }
152
153    async fn set_frequency(&self, frequency_hz: f64) -> ServiceResult<()> {
154        if frequency_hz <= 0.0 {
155            return Err(ServiceError::InvalidInput(
156                "Frequency must be greater than 0".to_string(),
157            ));
158        }
159
160        info!(target: "feagi-services", "Setting burst frequency to {} Hz", frequency_hz);
161
162        let mut runner = self.burst_runner.write();
163        runner.set_frequency(frequency_hz);
164
165        Ok(())
166    }
167
168    async fn get_burst_count(&self) -> ServiceResult<u64> {
169        let runner = self.burst_runner.read();
170        Ok(runner.get_burst_count())
171    }
172
173    async fn reset_burst_count(&self) -> ServiceResult<()> {
174        info!(target: "feagi-services", "Resetting burst count");
175
176        // TODO: Implement burst count reset in BurstLoopRunner
177        warn!(target: "feagi-services", "Burst count reset not yet implemented in BurstLoopRunner");
178
179        Err(ServiceError::NotImplemented(
180            "Burst count reset not yet implemented".to_string(),
181        ))
182    }
183
184    async fn get_fcl_snapshot(&self) -> ServiceResult<Vec<(u64, f32)>> {
185        let runner = self.burst_runner.read();
186        let fcl_data = runner.get_fcl_snapshot();
187
188        // Convert NeuronId (u32) to u64
189        let result = fcl_data
190            .iter()
191            .map(|(neuron_id, potential)| (neuron_id.0 as u64, *potential))
192            .collect();
193
194        Ok(result)
195    }
196
197    async fn get_fcl_snapshot_with_cortical_idx(&self) -> ServiceResult<Vec<(u64, u32, f32)>> {
198        let runner = self.burst_runner.read();
199        let npu = runner.get_npu();
200
201        // CRITICAL: Acquire lock ONCE and do BOTH operations (FCL snapshot + cortical_idx lookup)
202        // Previous code acquired lock twice: once for get_fcl_snapshot(), once for cortical_idx
203        let lock_start = std::time::Instant::now();
204        let thread_id = std::thread::current().id();
205        debug!("[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} attempting NPU lock for get_fcl_snapshot_with_cortical_idx at {:?}", thread_id, lock_start);
206        let result: Vec<(u64, u32, f32)> = {
207            // Acquire lock ONCE for both FCL snapshot and cortical_idx lookup
208            let npu_lock = npu.lock().unwrap();
209            let lock_acquired = std::time::Instant::now();
210            let lock_wait = lock_acquired.duration_since(lock_start);
211            debug!(
212                "[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} acquired lock after {:.2}ms wait for get_fcl_snapshot_with_cortical_idx",
213                thread_id,
214                lock_wait.as_secs_f64() * 1000.0
215            );
216
217            // STRICT: Resolve cortical_idx without fallbacks (memory neurons are handled explicitly).
218            let fcl_data = npu_lock
219                .get_last_fcl_snapshot_with_cortical_idx()
220                .map_err(|e| {
221                    ServiceError::Internal(format!("Failed to resolve FCL cortical_idx: {e}"))
222                })?;
223            debug!(
224                "[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} got FCL snapshot ({} neurons) with cortical_idx",
225                thread_id,
226                fcl_data.len()
227            );
228
229            fcl_data
230                .into_iter()
231                .map(|(neuron_id, cortical_idx, potential)| {
232                    (neuron_id.0 as u64, cortical_idx, potential)
233                })
234                .collect()
235        }; // Lock released here
236        let lock_released = std::time::Instant::now();
237        let _lock_hold_duration = lock_released.duration_since(lock_start);
238        debug!("[NPU-LOCK] RUNTIME-SERVICE: Thread {:?} RELEASED NPU lock after get_fcl_snapshot_with_cortical_idx (total: {:.2}ms wait + {:.2}ms hold, {} neurons)", 
239            thread_id,
240            lock_released.duration_since(lock_start).as_secs_f64() * 1000.0,
241            lock_released.duration_since(lock_start).as_secs_f64() * 1000.0,
242            result.len());
243
244        Ok(result)
245    }
246
247    async fn get_fire_queue_sample(
248        &self,
249    ) -> ServiceResult<
250        std::collections::HashMap<u32, (Vec<u32>, Vec<u32>, Vec<u32>, Vec<u32>, Vec<f32>)>,
251    > {
252        let mut runner = self.burst_runner.write();
253
254        match runner.get_fire_queue_sample() {
255            Some(sample) => {
256                // Convert AHashMap to std::HashMap for service layer compatibility
257                let result: std::collections::HashMap<_, _> = sample.into_iter().collect();
258                Ok(result)
259            }
260            None => Ok(std::collections::HashMap::new()),
261        }
262    }
263
264    async fn get_fire_ledger_configs(&self) -> ServiceResult<Vec<(u32, usize)>> {
265        debug!("[NPU-LOCK] RUNTIME-SERVICE: get_fire_ledger_configs() called - this acquires NPU lock!");
266        let runner = self.burst_runner.read();
267        let configs = runner.get_fire_ledger_configs();
268        Ok(configs)
269    }
270
271    async fn configure_fire_ledger_window(
272        &self,
273        cortical_idx: u32,
274        window_size: usize,
275    ) -> ServiceResult<()> {
276        let mut runner = self.burst_runner.write();
277        runner
278            .configure_fire_ledger_window(cortical_idx, window_size)
279            .map_err(|e| {
280                ServiceError::Internal(format!("Failed to configure fire ledger window: {e}"))
281            })?;
282
283        info!(target: "feagi-services", "Configured Fire Ledger window for area {}: {} bursts",
284            cortical_idx, window_size);
285
286        Ok(())
287    }
288
289    async fn get_fcl_sampler_config(&self) -> ServiceResult<(f64, u32)> {
290        let runner = self.burst_runner.read();
291        Ok(runner.get_fcl_sampler_config())
292    }
293
294    async fn set_fcl_sampler_config(
295        &self,
296        frequency: Option<f64>,
297        consumer: Option<u32>,
298    ) -> ServiceResult<()> {
299        let runner = self.burst_runner.read();
300        runner.set_fcl_sampler_config(frequency, consumer);
301        Ok(())
302    }
303
304    async fn get_area_fcl_sample_rate(&self, area_id: u32) -> ServiceResult<f64> {
305        let runner = self.burst_runner.read();
306        Ok(runner.get_area_fcl_sample_rate(area_id))
307    }
308
309    async fn set_area_fcl_sample_rate(&self, area_id: u32, sample_rate: f64) -> ServiceResult<()> {
310        if sample_rate <= 0.0 || sample_rate > 1000.0 {
311            return Err(ServiceError::InvalidInput(
312                "Sample rate must be between 0 and 1000 Hz".to_string(),
313            ));
314        }
315
316        let runner = self.burst_runner.read();
317        runner.set_area_fcl_sample_rate(area_id, sample_rate);
318
319        info!(target: "feagi-services", "Set FCL sample rate for area {} to {}Hz", area_id, sample_rate);
320        Ok(())
321    }
322
323    async fn inject_sensory_by_coordinates(
324        &self,
325        cortical_id: &str,
326        xyzp_data: &[(u32, u32, u32, f32)],
327    ) -> ServiceResult<usize> {
328        // Parse cortical ID from base64 string
329        let cortical_id_typed = CorticalID::try_from_base_64(cortical_id).map_err(|e| {
330            ServiceError::InvalidInput(format!("Invalid cortical ID format: {}", e))
331        })?;
332
333        // Get NPU from burst runner
334        let runner = self.burst_runner.read();
335        let npu = runner.get_npu();
336
337        // Inject using NPU's service layer method
338        let lock_start = std::time::Instant::now();
339        debug!(
340            "[NPU-LOCK] RUNTIME-SERVICE: Acquiring lock for manual stimulation ({} coordinates) - THIS CAN BLOCK BURST LOOP!",
341            xyzp_data.len()
342        );
343        let injected_count = {
344            let mut npu_lock = npu
345                .lock()
346                .map_err(|e| ServiceError::Backend(format!("Failed to lock NPU: {}", e)))?;
347            let lock_wait = lock_start.elapsed();
348            debug!(
349                "[NPU-LOCK] RUNTIME-SERVICE: Lock acquired for manual stimulation (waited {:.2}ms)",
350                lock_wait.as_secs_f64() * 1000.0
351            );
352            let result = npu_lock.inject_sensory_xyzp_by_id(&cortical_id_typed, xyzp_data);
353            let lock_hold_duration = lock_start.elapsed();
354            debug!(
355                "[NPU-LOCK] RUNTIME-SERVICE: Releasing lock after manual stimulation (held for {:.2}ms)",
356                lock_hold_duration.as_secs_f64() * 1000.0
357            );
358            result
359        };
360
361        if injected_count == 0 && !xyzp_data.is_empty() {
362            warn!(target: "feagi-services",
363                "No neurons found for injection: cortical_id={}, coordinates={}",
364                cortical_id, xyzp_data.len());
365        } else if injected_count > 0 {
366            info!(target: "feagi-services",
367                "Injected {} neurons into FCL for cortical area {}",
368                injected_count, cortical_id);
369        }
370
371        Ok(injected_count)
372    }
373}