eryon_rt/runtime/
impl_inner.rs

1/*
2    Appellation: impl_inner <module>
3    Contrib: @FL03
4*/
5use crate::runtime::RuntimeInner;
6
7use crate::actors::prelude::{Driver, TriadDriver, VNode};
8use crate::frag::FragTonnetz;
9use crate::orcha::Orchestrator;
10use crate::scheduler::Scheduler;
11use crate::tasks::{Task, TaskStatus, TaskType, TaskTypeKey};
12use crate::types::{PerformanceTracker, SystemInfo};
13use rshyper::EdgeId;
14use rstmt::nrt::Triad;
15use scsys::Timestamp;
16use std::collections::HashMap;
17use std::time::{Duration, Instant};
18
19impl<D> RuntimeInner<D>
20where
21    D: Driver<Triad>,
22{
23    /// Create a new runtime
24    pub fn new() -> Self {
25        let fragment = FragTonnetz::new();
26        let orchestrator = Orchestrator::new();
27        let system_info = SystemInfo::new();
28        let perf_tracker = PerformanceTracker::new();
29
30        Self {
31            fragment,
32            orchestrator,
33            scheduler: Scheduler::new(),
34            perf_tracker,
35            system_info,
36        }
37    }
38    /// Get a reference to the fragment
39    pub fn fragment(&self) -> &FragTonnetz<D> {
40        &self.fragment
41    }
42    /// Get a mutable reference to the fragment
43    pub fn fragment_mut(&mut self) -> &mut FragTonnetz<D> {
44        &mut self.fragment
45    }
46    /// Get a reference to the orchestrator
47    pub fn orchestrator(&self) -> &Orchestrator {
48        &self.orchestrator
49    }
50    /// returns a mutable reference to the orchestrator
51    pub fn orchestrator_mut(&mut self) -> &mut Orchestrator {
52        &mut self.orchestrator
53    }
54    /// returns an immutable reference to the performance tracker
55    pub const fn perf_tracker(&self) -> &PerformanceTracker {
56        &self.perf_tracker
57    }
58    /// returns a mutable reference to the performance tracker
59    pub fn perf_tracker_mut(&mut self) -> &mut PerformanceTracker {
60        &mut self.perf_tracker
61    }
62    /// Get a performance trend report
63    pub fn performance_trend_report(&self) -> String {
64        self.perf_tracker.generate_trend_report()
65    }
66    /// returns a reference to the scheduler
67    pub const fn scheduler(&self) -> &Scheduler {
68        &self.scheduler
69    }
70    /// returns a mutable reference to the scheduler
71    pub fn scheduler_mut(&mut self) -> &mut Scheduler {
72        &mut self.scheduler
73    }
74    /// returns an immutable reference to the system's info
75    pub const fn system_info(&self) -> &SystemInfo {
76        &self.system_info
77    }
78    /// returns a mutable reference to the system's info
79    pub fn system_info_mut(&mut self) -> &mut SystemInfo {
80        &mut self.system_info
81    }
82    /// returns a reference to the tasks stored in the scheduler
83    pub fn task_queue(&self) -> &[Task] {
84        &self.scheduler.task_queue
85    }
86    /// returns a mutable reference to the task queue within the scheduler
87    pub fn task_queue_mut(&mut self) -> &mut Vec<Task> {
88        &mut self.scheduler.task_queue
89    }
90    /// returns an immutable reference to the initialized virtual node's
91    pub fn vnodes(&self) -> &HashMap<EdgeId, VNode<D>> {
92        self.fragment.partitions()
93    }
94    /// returns a mutable reference to the initialized virtual nodes
95    pub fn vnodes_mut(&mut self) -> &mut HashMap<EdgeId, VNode<D>> {
96        self.fragment.partitions_mut()
97    }
98    /// set the runtimes fragment to the given value
99    pub fn set_fragment(&mut self, fragment: FragTonnetz<D>) {
100        self.fragment = fragment;
101    }
102    /// set the runtime's orchestrator
103    pub fn set_orchestrator(&mut self, orchestrator: Orchestrator) {
104        self.orchestrator = orchestrator;
105    }
106    /// set the runtime's scheduler
107    pub fn set_scheduler(&mut self, scheduler: Scheduler) {
108        self.scheduler = scheduler;
109    }
110    /// consumes the current instance to create another with the given fragment
111    pub fn with_fragment(self, fragment: FragTonnetz<D>) -> Self {
112        Self { fragment, ..self }
113    }
114    /// consumes the current instance to create another with the given orchestrator
115    pub fn with_orchestrator(self, orchestrator: Orchestrator) -> Self {
116        Self {
117            orchestrator,
118            ..self
119        }
120    }
121    /// consumes the current instance to create another with the given scheduler
122    pub fn with_scheduler(self, scheduler: Scheduler) -> Self {
123        Self { scheduler, ..self }
124    }
125    /// Update performance tracking with current metrics
126    pub fn update_performance_metrics(&mut self) {
127        // Get current metrics
128        let metrics = self.runtime_metrics();
129
130        // Record in the tracker
131        self.perf_tracker.record_metrics(metrics);
132    }
133}
134
135impl<D> RuntimeInner<D>
136where
137    D: Driver<Triad>,
138{
139    /// Adaptive task scheduling based on system resources and task performance history
140    pub fn adaptive_schedule(&mut self) -> crate::Result<()>
141    where
142        D: TriadDriver,
143    {
144        // Track historical execution times by task type discriminant (TaskTypeKey)
145        let mut task_type_metrics = HashMap::new();
146
147        // Get completed tasks from scheduler
148        for completed_task in self.scheduler.completed_tasks() {
149            let execution_time = completed_task
150                .completed_at
151                .unwrap_or_else(Instant::now)
152                .duration_since(
153                    completed_task
154                        .started_at
155                        .unwrap_or(completed_task.created_at),
156                );
157
158            // Use TaskTypeKey for the hashmap key instead of TaskType
159            let key = TaskTypeKey::from(&completed_task.kind);
160
161            let entry = task_type_metrics
162                .entry(key)
163                .or_insert_with(|| (Duration::from_secs(0), 0));
164
165            entry.0 += execution_time;
166            entry.1 += 1;
167        }
168
169        // Calculate average execution times
170        let mut avg_execution_times = HashMap::new();
171        for (key, (total_time, count)) in task_type_metrics {
172            if count > 0 {
173                let avg_time = total_time / count as u32;
174                avg_execution_times.insert(key, avg_time);
175            }
176        }
177
178        // Calculate current system load
179        let system_load = self.system_info.get_current_load();
180        let node_loads = self.fragment.get_all_edge_loads();
181
182        // For each queued task, adjust priority
183        let now = Instant::now();
184        for task in self.task_queue_mut() {
185            if matches!(task.status, TaskStatus::Queued) {
186                let base_priority = task.priority as f32;
187
188                // Factor 1: Expected execution time - use TaskTypeKey
189                let key = TaskTypeKey::from(&task.kind);
190                let time_factor = if let Some(avg_time) = avg_execution_times.get(&key) {
191                    // Normalize: shorter tasks get higher priority
192                    let max_time = Duration::from_secs(60); // Assume 60s max task time
193                    1.0 - (avg_time.as_secs_f32() / max_time.as_secs_f32()).min(1.0)
194                } else {
195                    0.5 // Default for unknown task types
196                };
197
198                // Factor 2: System load
199                let load_factor = 1.0 - system_load;
200
201                // Factor 3: Task age
202                let age = now.duration_since(task.created_at);
203                let age_factor = (age.as_secs_f32() / 60.0).min(1.0); // Max boost after 60 seconds
204
205                // Factor 4: Target node resource availability
206                let node_factor = match &task.kind {
207                    TaskType::Transform { node_id, .. } => {
208                        1.0 - node_loads.get(node_id).copied().unwrap_or(0.5)
209                    }
210                    TaskType::LearnPattern { node_id, .. } => {
211                        1.0 - node_loads.get(node_id).copied().unwrap_or(0.5)
212                    }
213                    _ => 0.5, // Default for tasks not targeting specific nodes
214                };
215
216                // Compute adjusted priority
217                let adjusted_priority = base_priority
218                    * (0.3 * time_factor
219                        + 0.2 * load_factor
220                        + 0.2 * age_factor
221                        + 0.3 * node_factor);
222
223                // Update task's effective priority (but keep original base priority)
224                task.effective_priority = adjusted_priority.round() as usize;
225            }
226        }
227
228        // Sort by effective priority (higher first)
229        self.task_queue_mut()
230            .sort_by(|a, b| b.effective_priority.cmp(&a.effective_priority));
231
232        Ok(())
233    }
234    /// Add a triad to the fragment
235    pub fn add_triad(&mut self, triad: Triad) -> crate::Result<rshyper::EdgeId> {
236        self.fragment.add_triad(triad)
237    }
238    /// Auto-configure the runtime based on available system resources
239    pub fn auto_configure(&mut self) -> crate::Result<()> {
240        // Calculate optimal number of partitions based on CPU cores
241        let optimal_partitions = self.system_info.get_cpu_cores().max(1);
242
243        // If we already have enough partitions, we're good
244        if self.fragment.total_partitions() >= optimal_partitions {
245            return Ok(());
246        }
247
248        // Otherwise, initialize with all 24 major and minor triads
249        self.init_complete_layer()
250    }
251
252    pub fn completed_tasks(&self) -> Vec<&Task> {
253        self.scheduler.completed_tasks()
254    }
255    /// Coordinate distributed learning with gradient sharing
256    pub fn coordinate_distributed_learning(&mut self) -> crate::Result<usize>
257    where
258        D: TriadDriver,
259    {
260        // Group nodes by chord class
261        let mut nodes_by_class = std::collections::HashMap::new();
262
263        for (&edge_id, node) in self.fragment.partitions() {
264            if node.has_surface_network() {
265                nodes_by_class
266                    .entry(node.class())
267                    .or_insert_with(Vec::new)
268                    .push(edge_id);
269            }
270        }
271
272        // For each class, coordinate learning
273        for (_class, node_ids) in nodes_by_class {
274            if node_ids.len() <= 1 {
275                continue;
276            }
277
278            // 1. Collect gradients from all nodes
279            let mut all_gradients = Vec::new();
280
281            for node_id in &node_ids {
282                if let Some(node) = self.fragment.get_vnode(node_id) {
283                    let gradients = node.extract_learning_gradients()?;
284                    all_gradients.push((node_id, gradients));
285                }
286            }
287
288            // 2. Average the gradients
289            let mut averaged_gradients = HashMap::new();
290
291            for (_node_id, gradients) in all_gradients {
292                for (pattern_id, gradient) in gradients {
293                    averaged_gradients
294                        .entry(pattern_id)
295                        .or_insert_with(Vec::new)
296                        .push(gradient);
297                }
298            }
299
300            // 3. Apply averaged gradients back to nodes
301            for node_id in &node_ids {
302                if let Some(node) = self.fragment.get_vnode_mut(node_id) {
303                    node.apply_federated_gradients(&averaged_gradients)?;
304                }
305            }
306
307            // 4. Schedule evaluation task
308            self.schedule_task(TaskType::EvaluatePerformance, 8);
309        }
310
311        // Record this coordination activity
312        let task_id = self.schedule_task(TaskType::SharePatterns, 6);
313
314        Ok(task_id)
315    }
316    /// Coordinate knowledge sharing between surface networks
317    pub fn coordinate_knowledge_transfer(&mut self) -> crate::Result<()>
318    where
319        D: TriadDriver,
320    {
321        // 1. Identify knowledge-bearing nodes
322        let nodes_with_knowledge: Vec<_> = self
323            .fragment
324            .partitions()
325            .iter()
326            .filter(|(_, node)| node.has_surface_network() && node.total_features() > 10)
327            .map(|(&id, _)| id)
328            .collect();
329
330        if nodes_with_knowledge.len() <= 1 {
331            return Ok(()); // Nothing to coordinate
332        }
333
334        // 2. Extract knowledge patterns
335        let mut knowledge_patterns = HashMap::new();
336        for node_id in &nodes_with_knowledge {
337            if let Some(node) = self.fragment.get_vnode(node_id) {
338                // Extract discrete pattern representations
339                match node.extract_knowledge_patterns() {
340                    Ok(patterns) => {
341                        if !patterns.is_empty() {
342                            knowledge_patterns.insert(*node_id, patterns);
343                        }
344                    }
345                    Err(_e) => {
346                        #[cfg(feature = "tracing")]
347                        tracing::warn!(
348                            "Failed to extract knowledge from node {}: {:?}",
349                            node_id,
350                            _e
351                        );
352                        continue;
353                    }
354                }
355            }
356        }
357
358        if knowledge_patterns.is_empty() {
359            return Ok(());
360        }
361
362        // 3. Generate a consensus hash for verification
363        let consensus_hash = self.compute_knowledge_consensus_hash(&knowledge_patterns);
364
365        // 4. Calculate all proximities first to avoid borrowing conflicts
366        let mut transfer_data = Vec::new();
367
368        for &target_id in &nodes_with_knowledge {
369            let mut source_pattern_data = Vec::new();
370
371            for (&source_id, patterns) in &knowledge_patterns {
372                if source_id != target_id {
373                    // Calculate proximity in the tonal space
374                    let proximity = if let Some(distance) =
375                        self.fragment.voice_leading_distance(&target_id, &source_id)
376                    {
377                        1.0 / (1.0 + distance as f32 / 6.0) // Normalize by octave divisions
378                    } else {
379                        0.5 // Default if we can't calculate distance
380                    };
381
382                    // Only integrate if there's significant proximity
383                    if proximity > 0.3 {
384                        // Select patterns to integrate based on proximity
385                        let pattern_count = (patterns.len() as f32 * proximity).ceil() as usize;
386                        if pattern_count > 0 {
387                            let selected_patterns = if pattern_count >= patterns.len() {
388                                patterns.clone()
389                            } else {
390                                // Take most important patterns first
391                                patterns.iter().take(pattern_count).cloned().collect()
392                            };
393
394                            source_pattern_data.push((source_id, selected_patterns));
395                        }
396                    }
397                }
398            }
399
400            // Store all transfer data for this target
401            if !source_pattern_data.is_empty() {
402                transfer_data.push((target_id, source_pattern_data));
403            }
404        }
405
406        // 5. Now perform the actual knowledge transfer without concurrent borrowing
407        for (target_id, source_patterns) in transfer_data {
408            if let Some(target_node) = self.fragment.get_vnode_mut(&target_id) {
409                // Apply knowledge from sources
410                for (source_id, patterns) in source_patterns {
411                    if let Err(e) = target_node.integrate_external_knowledge(&patterns, &source_id)
412                    {
413                        #[cfg(feature = "tracing")]
414                        tracing::error!("Failed to integrate knowledge: {:?}", e);
415                        #[cfg(not(feature = "tracing"))]
416                        eprintln!("Failed to integrate knowledge: {:?}", e);
417                    }
418                }
419            }
420        }
421
422        // 6. Register the knowledge transfer event
423        self.register_knowledge_transfer(consensus_hash, nodes_with_knowledge.len());
424
425        Ok(())
426    }
427    /// coordinate learning between nodes
428    pub fn coordinate_learning(&mut self) -> crate::Result<()>
429    where
430        D: TriadDriver,
431    {
432        // 1. Identify knowledge-bearing nodes
433        let nodes_with_knowledge: Vec<_> = self
434            .fragment
435            .partitions()
436            .iter()
437            .filter(|(_, node)| node.has_surface_network() && node.total_features() > 0)
438            .map(|(&id, _)| id)
439            .collect();
440
441        if nodes_with_knowledge.len() <= 1 {
442            return Ok(()); // Nothing to coordinate
443        }
444
445        // 2. Extract patterns once per node - this is our main optimization
446        let mut extracted_patterns = Vec::with_capacity(nodes_with_knowledge.len());
447
448        for &node_id in &nodes_with_knowledge {
449            if let Some(node) = self.get_vnode(&node_id) {
450                match node.extract_knowledge_patterns() {
451                    Ok(patterns) if !patterns.is_empty() => {
452                        extracted_patterns.push((node_id, patterns));
453                    }
454                    _ => continue,
455                }
456            }
457        }
458
459        // 3. Use a constant limit on targets per source to control complexity
460        let max_targets_per_source = core::cmp::min(5, nodes_with_knowledge.len() - 1);
461
462        // 4. Perform transfers using the extracted patterns
463        for (source_id, patterns) in extracted_patterns {
464            // Skip empty pattern sets
465            if patterns.is_empty() {
466                continue;
467            }
468
469            // Choose target nodes (limited number)
470            let mut target_count = 0;
471
472            for target_id in &nodes_with_knowledge {
473                if target_id != &source_id && target_count < max_targets_per_source {
474                    if let Some(target_node) = self.get_vnode_mut(target_id) {
475                        if let Err(_e) =
476                            target_node.integrate_external_knowledge(&patterns, &source_id)
477                        {
478                            #[cfg(feature = "tracing")]
479                            tracing::error!("Failed to integrate knowledge: {_e:?}");
480                            #[cfg(not(feature = "tracing"))]
481                            eprintln!("Failed to integrate knowledge: {_e:?}");
482                        }
483
484                        target_count += 1;
485                    }
486                }
487            }
488        }
489
490        // 5. Record coordination event
491        for node in self.fragment_mut().partitions_mut().values_mut() {
492            node.store_mut().record_event(
493                "knowledge_coordination",
494                Some(vec![nodes_with_knowledge.len()]),
495            );
496        }
497
498        Ok(())
499    }
500    /// executes the task associated with the given identitifier
501    pub fn execute_task(&mut self, task_id: usize) -> crate::Result<()>
502    where
503        D: TriadDriver,
504    {
505        let task = self
506            .scheduler
507            .get_task(task_id)
508            .ok_or_else(|| crate::RuntimeError::TaskNotFound(task_id))?;
509        self.orchestrator.execute_task(&mut self.fragment, task)
510    }
511    /// Execute tasks in the queue
512    pub fn execute_tasks(&mut self) -> crate::Result<()>
513    where
514        D: TriadDriver,
515    {
516        // Use the scheduler to process tasks
517        let _res = self
518            .scheduler
519            .process_batch(&mut self.fragment, &mut self.orchestrator)?;
520        Ok(())
521    }
522    /// returns an immutable reference to the node with the given identifier; if any
523    pub fn get_vnode(&self, id: &EdgeId) -> Option<&VNode<D>> {
524        self.fragment().get_vnode(id)
525    }
526    /// returns a mutable reference to the partition with the given identifier; if any
527    pub fn get_vnode_mut(&mut self, id: &EdgeId) -> Option<&mut VNode<D>> {
528        self.fragment_mut().get_vnode_mut(id)
529    }
530    /// Initialize the runtime with major and minor triads
531    pub fn init_complete_layer(&mut self) -> crate::Result<()> {
532        // Initialize the fragment with all triads
533        self.fragment.initialize_with_all_triads()?;
534
535        // Initialize the orchestrator
536        self.orchestrator.initialize(&self.fragment);
537
538        Ok(())
539    }
540    /// Initialize surface networks for all nodes in the fragment
541    pub fn init_node_surfaces(&mut self) -> crate::Result<()>
542    where
543        D: TriadDriver,
544    {
545        let mut initialized_count = 0;
546
547        // Iterate through all partitions
548        for node in self.fragment.partitions_mut().values_mut() {
549            // Skip nodes that already have a surface network
550            if node.has_surface_network() {
551                continue;
552            }
553
554            // Initialize the surface network for this node
555            node.init_surface()?;
556
557            // Track how many we've initialized
558            initialized_count += 1;
559        }
560
561        if initialized_count > 0 {
562            #[cfg(feature = "tracing")]
563            tracing::debug!("Initialized {} node surface networks", initialized_count);
564        }
565
566        Ok(())
567    }
568    /// Initialize surface networks with class-specific tuning
569    pub fn init_specialized_surfaces(&mut self) -> crate::Result<()>
570    where
571        D: TriadDriver,
572    {
573        use crate::nrt::Triads::*;
574        // Group nodes by chord class
575        let mut nodes_by_class = std::collections::HashMap::new();
576
577        for (&edge_id, node) in self.fragment.partitions() {
578            nodes_by_class
579                .entry(node.class())
580                .or_insert_with(Vec::new)
581                .push(edge_id);
582        }
583
584        // Parameters optimized for each chord class
585        let class_params: HashMap<_, _> = [
586            // Major triads: More emphasis on dominant transformations
587            (Major, (0.02, 0.6, 0.4)),
588            // Minor triads: More sensitivity to subdominant transformations
589            (Minor, (0.01, 0.8, 0.5)),
590            // Diminished triads: Higher resolution for chromatic movement
591            (Diminished, (0.04, 0.7, 0.6)),
592            // Augmented triads: Focus on symmetrical properties
593            (Augmented, (0.05, 0.5, 0.8)),
594        ]
595        .into_iter()
596        .collect();
597
598        // Apply specialized parameters for each node
599        for (class, node_ids) in &nodes_by_class {
600            let params = class_params.get(class).unwrap_or(&(0.01, 0.7, 0.5));
601
602            for edge_id in node_ids {
603                if let Some(node) = self.fragment.get_vnode_mut(edge_id) {
604                    if !node.has_surface_network() {
605                        let config = crate::actors::surface::SurfaceModelConfig::new(
606                            params.0, params.1, params.2,
607                        );
608                        // Initialize with class-specific parameters
609                        node.surface_mut().map(|i| i.model_mut().set_config(config));
610                    }
611                }
612            }
613        }
614
615        #[cfg(feature = "tracing")]
616        tracing::debug!(
617            "Initialized specialized surface networks for {} classes",
618            nodes_by_class.len()
619        );
620
621        Ok(())
622    }
623    /// Optimize memory allocation across nodes
624    pub fn optimize_memory_allocation(&mut self) -> crate::Result<()>
625    where
626        D: TriadDriver,
627    {
628        // Get current memory usage per node
629        let mut node_memory_usage = HashMap::new();
630        let mut total_features = 0;
631
632        for (&edge_id, node) in self.fragment.partitions() {
633            let feature_count = node.total_features();
634            node_memory_usage.insert(edge_id, feature_count);
635            total_features += feature_count;
636        }
637
638        // Calculate average memory usage
639        if self.fragment.total_partitions() == 0 {
640            return Ok(());
641        }
642
643        let avg_features = total_features / self.fragment.total_partitions();
644        let target_max = (avg_features as f32 * 1.5) as usize; // Allow 50% above average
645
646        // Find nodes with excessive features
647        let mut nodes_to_optimize = Vec::new();
648        for (&edge_id, &feature_count) in &node_memory_usage {
649            if feature_count > target_max {
650                nodes_to_optimize.push((edge_id, feature_count));
651            }
652        }
653
654        // Sort by feature count (highest first)
655        nodes_to_optimize.sort_by(|a, b| b.1.cmp(&a.1));
656
657        // Optimize each node
658        for (edge_id, _) in nodes_to_optimize {
659            if let Some(node) = self.fragment.get_vnode_mut(&edge_id) {
660                // Consolidate patterns with similar features
661                node.consolidate_similar_patterns(0.8)?;
662
663                // Prune low-importance features
664                node.prune_low_importance_features(0.2)?;
665            }
666        }
667
668        Ok(())
669    }
670    /// Optimize the surface networks across all nodes
671    pub fn optimize_surface_networks(&mut self) -> crate::Result<()>
672    where
673        D: TriadDriver,
674    {
675        // Group nodes by chord class
676        let mut nodes_by_class = std::collections::HashMap::new();
677
678        // Analyze all nodes and group by chord class
679        for (&edge_id, node) in self.fragment.partitions() {
680            nodes_by_class
681                .entry(node.class())
682                .or_insert_with(Vec::new)
683                .push(edge_id);
684        }
685
686        // For each class, find best-performing node and share patterns
687        for (_class, node_ids) in nodes_by_class {
688            if node_ids.len() <= 1 {
689                continue;
690            }
691
692            // Find node with most learning
693            let mut best_node_id = None;
694            let mut most_features = 0;
695
696            for id in &node_ids {
697                if let Some(node) = self.fragment.get_vnode(id) {
698                    let features = node.total_features();
699                    if features > most_features && node.has_surface_network() {
700                        best_node_id = Some(*id);
701                        most_features = features;
702                    }
703                }
704            }
705
706            // Share patterns from best node to others
707            if let Some(source_id) = best_node_id {
708                for &target_id in &node_ids {
709                    if target_id != source_id {
710                        // Create message to share patterns
711                        self.orchestrator
712                            .send_message(crate::NodeMessage::PatternShare {
713                                source: source_id,
714                                pattern: vec![], // Will be populated by the node
715                                importance: 0.8,
716                            });
717                    }
718                }
719            }
720        }
721
722        // Process the messages
723        self.process_messages()?;
724
725        Ok(())
726    }
727    /// Get predicted resource requirements for a task
728    pub fn predict_task_resources(&self, task_type: &TaskType) -> (f32, f32) {
729        // Use discriminant/TaskTypeKey to avoid floating-point issues in pattern matching
730        match TaskTypeKey::from(task_type) {
731            TaskTypeKey::Transform => (5.0, 50.0),
732            TaskTypeKey::LearnPattern => (20.0, 100.0),
733            TaskTypeKey::SharePatterns => (10.0, 80.0),
734            TaskTypeKey::OptimizeNode => (15.0, 30.0),
735            _ => (5.0, 20.0), // Default for unknown tasks
736        }
737    }
738    /// Schedule tasks using predictive resource management
739    pub fn predictive_schedule(&mut self, look_ahead_ms: u64) -> crate::Result<()>
740    where
741        D: TriadDriver,
742    {
743        // 1. Get current system state
744        let current_cpu = self.system_info.get_cpu_usage();
745        let current_memory = self.system_info.get_ram_memory_usage();
746        let available_memory =
747            self.system_info.get_free_memory_mb() as f32 * (1.0 - current_memory);
748
749        // 2. Estimate resource availability window
750        // TODO: find a way to use these values
751        let _cpu_headroom = 1.0 - current_cpu;
752        let _mem_headroom = available_memory;
753
754        // 3. Build resource timeline for currently executing tasks
755        let mut cpu_timeline = vec![current_cpu];
756        let mut mem_timeline = vec![current_memory];
757        let timeline_slots = 10; // Divide look-ahead into slots
758
759        // Get currently running tasks
760        let running_tasks = self.scheduler.get_running_tasks();
761
762        // Estimate when each task will complete and free resources
763        for slot in 1..timeline_slots {
764            let slot_time = Instant::now()
765                + Duration::from_millis(look_ahead_ms * slot as u64 / timeline_slots as u64);
766
767            // Start with current usage
768            let mut cpu_usage = current_cpu;
769            let mut mem_usage = current_memory;
770
771            // Adjust for tasks expected to complete by this time
772            for task in &running_tasks {
773                let task_started = task.started_at.unwrap_or(task.created_at);
774                let (task_cpu, task_mem) = self.predict_task_resources(&task.kind);
775
776                // Estimate task duration based on task type discriminant
777                let est_duration = {
778                    let task_key = TaskTypeKey::from(&task.kind);
779                    self.scheduler.estimate_task_duration(task_key)
780                };
781                let expected_completion = task_started + est_duration;
782
783                // If task still running at this slot, count its resources
784                if expected_completion > slot_time {
785                    cpu_usage += task_cpu / 100.0;
786                    mem_usage += task_mem / self.system_info.get_free_memory_mb() as f32;
787                }
788            }
789
790            // Cap at reasonable values
791            cpu_usage = cpu_usage.min(1.0).max(0.1);
792            mem_usage = mem_usage.min(1.0).max(0.1);
793
794            cpu_timeline.push(cpu_usage);
795            mem_timeline.push(mem_usage);
796        }
797
798        // 4. Find optimal scheduling slot for each queued task
799        let mut scheduled_tasks = Vec::new();
800
801        for task in self.task_queue() {
802            if !matches!(task.status, TaskStatus::Queued) {
803                continue;
804            }
805
806            let (task_cpu, task_mem) = self.predict_task_resources(&task.kind);
807            let normalized_cpu = task_cpu / 100.0;
808            let normalized_mem = task_mem / self.system_info.get_free_memory_mb() as f32;
809
810            // Find the best slot to schedule this task
811            let mut best_slot = 0;
812            let mut best_fit = f32::MAX;
813
814            for slot in 0..timeline_slots {
815                // Check if resources available in this slot
816                let slot_cpu = cpu_timeline[slot];
817                let slot_mem = mem_timeline[slot];
818
819                if slot_cpu + normalized_cpu <= 1.0 && slot_mem + normalized_mem <= 1.0 {
820                    // Calculate how well the task fits this slot (smaller is better)
821                    let fit = normalized_cpu / (1.0 - slot_cpu) + normalized_mem / (1.0 - slot_mem);
822
823                    if fit < best_fit {
824                        best_fit = fit;
825                        best_slot = slot;
826                    }
827                }
828            }
829
830            // If we found a slot, schedule task with delay
831            if best_fit < f32::MAX {
832                let delay_ms = look_ahead_ms * best_slot as u64 / timeline_slots as u64;
833
834                // Add task to the scheduled list with its delay
835                scheduled_tasks.push((task.id, delay_ms));
836
837                // Update resource timeline
838                if best_slot < timeline_slots {
839                    for slot in best_slot..timeline_slots {
840                        cpu_timeline[slot] += normalized_cpu;
841                        mem_timeline[slot] += normalized_mem;
842                    }
843                }
844            }
845        }
846
847        // 5. Apply scheduling delays
848        for (task_id, delay_ms) in scheduled_tasks {
849            if let Some(task) = self.task_queue_mut().iter_mut().find(|t| t.id == task_id) {
850                task.scheduled_delay = Some(Duration::from_millis(delay_ms));
851            }
852        }
853
854        Ok(())
855    }
856    /// Prioritize tasks based on observer position in Tonnetz space
857    pub fn prioritize_tonnetz_tasks(&mut self) -> crate::Result<()>
858    where
859        D: TriadDriver,
860    {
861        // 1. Find observer position (a node with Observer role)
862        let mut observer_position = None;
863
864        for (&edge_id, node) in self.fragment.partitions() {
865            if matches!(node.operator(), crate::actors::Operator::Observer(_)) {
866                observer_position = Some(edge_id);
867                break;
868            }
869        }
870
871        // If no observer found, use any node
872        let observer_position = observer_position.unwrap_or_else(|| {
873            self.fragment
874                .partitions()
875                .keys()
876                .next()
877                .copied()
878                .unwrap_or_default()
879        });
880
881        // 2. Collect tasks and their affected nodes first
882        let mut task_adjustments = Vec::new();
883
884        for (idx, task) in self.task_queue().iter().enumerate() {
885            if !matches!(task.status, TaskStatus::Queued) {
886                continue;
887            }
888
889            // Get the node ID affected by this task
890            let affected_node = match &task.kind {
891                TaskType::Transform { node_id, .. } => Some(*node_id),
892                TaskType::OptimizeNode { node_id, .. } => Some(*node_id),
893                TaskType::ChangeNodeMode(node_id, _) => Some(*node_id),
894                TaskType::LearnPattern { node_id, .. } => Some(*node_id),
895                TaskType::EvaluateEnsemble { node_id, .. } => Some(*node_id),
896                _ => None, // Tasks that don't target specific nodes
897            };
898
899            if let Some(node_id) = affected_node {
900                // Calculate voice-leading distance between observer and affected node
901                if let Some(distance) = self.voice_leading_distance(&observer_position, &node_id) {
902                    // Convert to octave-based distance factor
903                    let octave_layer = distance as f32 / 12.0;
904                    let distance_factor = (1.0 + octave_layer).recip();
905
906                    // Calculate adjusted priority
907                    let base_priority = task.priority as f32;
908                    let new_priority = (base_priority * distance_factor).round() as usize;
909
910                    // Store index and new priority for later application
911                    task_adjustments.push((idx, new_priority));
912                }
913            }
914        }
915
916        // 3. Apply the priority adjustments after computation is done
917        for (idx, priority) in task_adjustments {
918            if let Some(task) = self.task_queue_mut().get_mut(idx) {
919                task.effective_priority = priority;
920            }
921        }
922
923        // 4. Re-sort task queue
924        self.task_queue_mut()
925            .sort_by(|a, b| b.effective_priority.cmp(&a.effective_priority));
926
927        Ok(())
928    }
929    /// Process messages between nodes
930    pub fn process_messages(&mut self) -> crate::Result<()>
931    where
932        D: TriadDriver,
933    {
934        self.orchestrator.process_messages(&mut self.fragment)
935    }
936    /// Run maintenance including knowledge coordination
937    pub fn run_maintenance(&mut self) -> crate::Result<Vec<usize>>
938    where
939        D: TriadDriver,
940    {
941        let mut task_ids = Vec::new();
942
943        // 1. Memory optimization
944        let memory_task_id = self.schedule_task(
945            TaskType::OptimizeMemory { max_features: 5000 },
946            3, // Low priority
947        );
948        task_ids.push(memory_task_id);
949
950        // 2. Knowledge transfer (higher priority than memory optimization)
951        let knowledge_task_id = self.schedule_knowledge_transfer(6);
952        task_ids.push(knowledge_task_id);
953
954        // 3. Resource balancing
955        let balance_task_id = self.schedule_task(
956            TaskType::BalanceResources,
957            4, // Medium priority
958        );
959        task_ids.push(balance_task_id);
960
961        Ok(task_ids)
962    }
963    /// Get detailed resource metrics
964    pub fn runtime_metrics(&self) -> crate::ResourceMetrics {
965        crate::ResourceMetrics::default()
966            .with_cpu_usage(self.system_info.get_cpu_usage())
967            .with_memory_usage(self.system_info.memory_usage())
968            .with_available_memory(self.system_info.get_free_memory_mb())
969            .with_partition_count(self.fragment.total_partitions())
970            .with_edge_count(self.fragment.total_edges())
971            .with_vertex_count(self.fragment.total_nodes())
972            .with_queued_tasks(self.task_queue().len())
973            .with_running_tasks(self.orchestrator.active_task_count())
974            .with_learning_nodes(
975                self.fragment
976                    .partitions()
977                    .values()
978                    .filter(|node| node.has_surface_network())
979                    .count(),
980            )
981            .with_feature_count(
982                self.fragment
983                    .partitions()
984                    .values()
985                    .map(|node| node.total_features())
986                    .sum::<usize>(),
987            )
988    }
989    /// Schedule a knowledge transfer task with the given priority
990    pub fn schedule_knowledge_transfer(&mut self, priority: usize) -> usize {
991        // Schedule a task for the next step
992        self.schedule_task(TaskType::CoordinateLearning, priority)
993    }
994    /// Schedule memory optimization based on current memory usage
995    pub fn schedule_memory_optimization(&mut self) -> crate::Result<usize> {
996        // Calculate total features across the fragment
997        let total_features: usize = self
998            .fragment
999            .partitions()
1000            .values()
1001            .map(|node| node.total_features())
1002            .sum();
1003
1004        // Get node count for average calculation
1005        let node_count = self.fragment.partitions().len().max(1);
1006
1007        // Calculate average features per node
1008        let avg_features_per_node = total_features / node_count;
1009
1010        // Set target max features - higher for busy fragments
1011        let max_features = if avg_features_per_node > 1000 {
1012            // Heavy memory usage - be more aggressive
1013            avg_features_per_node * 2 / 3
1014        } else {
1015            // Normal usage - be conservative
1016            avg_features_per_node * 3 / 4
1017        }
1018        .max(500); // Ensure reasonable minimum
1019
1020        // Schedule the optimization task with appropriate priority
1021        let priority = if avg_features_per_node > 2000 {
1022            // High memory pressure - higher priority
1023            7
1024        } else {
1025            // Normal memory pressure - standard maintenance priority
1026            3
1027        };
1028
1029        // Schedule the task
1030        let task_id = self.schedule_task(TaskType::OptimizeMemory { max_features }, priority);
1031        Ok(task_id)
1032    }
1033    /// Schedule a task for execution
1034    pub fn schedule_task(&mut self, task_type: TaskType, priority: usize) -> usize {
1035        // Use the scheduler to create and queue the task
1036        self.scheduler.create_task(task_type, priority)
1037    }
1038    /// Generate a comprehensive system health report with performance insights
1039    pub fn system_health_report(&self) -> String {
1040        let metrics = self.runtime_metrics();
1041        let info = self.system_info();
1042        let timestamp = Timestamp::<u64>::now();
1043
1044        let mut report = String::new();
1045
1046        // Header with timestamp
1047        report.push_str(&format!("System Health Report - {}\n", timestamp));
1048        report.push_str("==================================\n\n");
1049
1050        // System resources section
1051        report.push_str("SYSTEM RESOURCES\n");
1052        report.push_str("-----------------\n");
1053        report.push_str(&format!("CPU Cores:         {}\n", info.get_cpu_cores()));
1054        report.push_str(&format!(
1055            "Memory:            {} MB\n",
1056            info.get_free_memory_mb()
1057        ));
1058        report.push_str(&format!(
1059            "CPU Usage:         {:.1}%\n",
1060            metrics.cpu_usage() * 100.0
1061        ));
1062        report.push_str(&format!(
1063            "Memory Usage:      {:.1}%\n",
1064            metrics.memory_usage() * 100.0
1065        ));
1066        report.push_str(&format!(
1067            "Available Compute: {}%\n\n",
1068            metrics.available_compute()
1069        ));
1070
1071        // Fragment topology section
1072        report.push_str("TONNETZ TOPOLOGY\n");
1073        report.push_str("----------------\n");
1074        report.push_str(&format!(
1075            "Partitions:        {}\n",
1076            metrics.partition_count()
1077        ));
1078        report.push_str(&format!("Vertices:          {}\n", metrics.vertex_count()));
1079        report.push_str(&format!("Edges:             {}\n\n", metrics.edge_count()));
1080
1081        // Runtime activity section
1082        report.push_str("RUNTIME ACTIVITY\n");
1083        report.push_str("----------------\n");
1084        report.push_str(&format!("Queued Tasks:      {}\n", metrics.queued_tasks()));
1085        report.push_str(&format!("Running Tasks:     {}\n", metrics.running_tasks()));
1086
1087        // Get task distribution
1088        let mut task_counts = std::collections::HashMap::new();
1089        for task in self.task_queue() {
1090            let key = TaskTypeKey::from(&task.kind);
1091            *task_counts.entry(key).or_insert(0) += 1;
1092        }
1093
1094        // Show task distribution if there are any tasks
1095        if !task_counts.is_empty() {
1096            report.push_str("Task Distribution:\n");
1097            for (task_type, count) in task_counts {
1098                report.push_str(&format!("  - {}: {}\n", task_type, count));
1099            }
1100        }
1101        report.push('\n');
1102
1103        // Learning status
1104        report.push_str("LEARNING STATUS\n");
1105        report.push_str("--------------\n");
1106        report.push_str(&format!(
1107            "Learning Nodes:     {}\n",
1108            metrics.learning_nodes()
1109        ));
1110        report.push_str(&format!(
1111            "Surface Networks:   {}\n",
1112            metrics.surface_network_count()
1113        ));
1114        report.push_str(&format!(
1115            "Features:          {}\n\n",
1116            metrics.feature_count()
1117        ));
1118
1119        // Capabilities
1120        report.push_str("CAPABILITIES\n");
1121        report.push_str("-----------\n");
1122        for capability in metrics.capabilities() {
1123            report.push_str(&format!("- {}\n", capability));
1124        }
1125        report.push('\n');
1126
1127        // Performance assessment
1128        report.push_str("PERFORMANCE ASSESSMENT\n");
1129        report.push_str("---------------------\n");
1130
1131        // Memory pressure assessment
1132        let memory_ratio =
1133            (metrics.feature_count() as f32) / (1000.0 * metrics.partition_count().max(1) as f32);
1134        let memory_status = if memory_ratio > 2.0 {
1135            "HIGH - Consider memory optimization"
1136        } else if memory_ratio > 1.0 {
1137            "MODERATE - Monitor memory growth"
1138        } else {
1139            "GOOD - Memory usage is acceptable"
1140        };
1141        report.push_str(&format!("Memory Pressure:    {}\n", memory_status));
1142
1143        // CPU pressure assessment
1144        let cpu_status = if metrics.cpu_usage() > &0.8 {
1145            "HIGH - Consider scaling out"
1146        } else if metrics.cpu_usage() > &0.5 {
1147            "MODERATE - Monitor CPU usage"
1148        } else {
1149            "GOOD - CPU usage is acceptable"
1150        };
1151        report.push_str(&format!("CPU Pressure:       {}\n", cpu_status));
1152
1153        // Task backlog assessment
1154        let task_ratio = metrics.queued_tasks() as f32 / (info.get_cpu_cores() * 2) as f32;
1155        let task_status = if task_ratio > 3.0 {
1156            "HIGH - Large task backlog"
1157        } else if task_ratio > 1.0 {
1158            "MODERATE - Tasks may experience delays"
1159        } else {
1160            "GOOD - Task processing is timely"
1161        };
1162        report.push_str(&format!("Task Backlog:       {}\n", task_status));
1163
1164        report
1165    }
1166    /// Tune surface network parameters based on node performance
1167    pub fn tune_surface_networks(&mut self) -> crate::Result<()>
1168    where
1169        D: TriadDriver,
1170    {
1171        // Analyze performance of existing surface networks
1172        let mut performance_metrics = HashMap::new();
1173
1174        for (&edge_id, node) in self.fragment.partitions() {
1175            if node.has_surface_network() {
1176                // Get learning metrics for this node
1177                let accuracy = node.get_learning_accuracy();
1178                let convergence_rate = node.get_convergence_rate();
1179                let feature_quality = node.get_feature_quality();
1180
1181                // Combined performance score
1182                let performance = accuracy * 0.5 + convergence_rate * 0.3 + feature_quality * 0.2;
1183
1184                performance_metrics.insert(edge_id, performance);
1185            }
1186        }
1187
1188        // Find best and worst performing nodes
1189        if performance_metrics.is_empty() {
1190            return Ok(());
1191        }
1192
1193        let mut entries: Vec<_> = performance_metrics.iter().collect();
1194        entries.sort_by(|a, b| b.1.partial_cmp(a.1).unwrap_or(std::cmp::Ordering::Equal));
1195
1196        let best_node_id = *entries.first().unwrap().0;
1197
1198        // Get parameters from best node
1199        if let Some(best_node) = self.fragment().get_vnode(&best_node_id) {
1200            let optimal_params = best_node.get_surface_parameters();
1201
1202            // Apply to lower performing nodes
1203            for (edge_id, performance) in performance_metrics {
1204                // Only adjust nodes with below-average performance
1205                if performance < 0.7 && edge_id != best_node_id {
1206                    if let Some(node) = self.fragment_mut().get_vnode_mut(&edge_id) {
1207                        // Adjust toward optimal parameters
1208                        node.adjust_surface_parameters(&optimal_params, 0.3)?;
1209                    }
1210                }
1211            }
1212        }
1213
1214        Ok(())
1215    }
1216    /// compute the vocal leading distance between two facets; see
1217    /// [voice_leading_distance](crate::FragTonnetz::voice_leading_distance) for more information
1218    pub fn voice_leading_distance(&self, from: &EdgeId, to: &EdgeId) -> Option<usize> {
1219        self.fragment().voice_leading_distance(from, to)
1220    }
1221}
1222
1223impl<D> RuntimeInner<D>
1224where
1225    D: Driver<Triad>,
1226{
1227    /// Compute a hash representing the consensus state of knowledge
1228    #[doc(hidden)]
1229    pub fn compute_knowledge_consensus_hash(
1230        &self,
1231        patterns: &HashMap<EdgeId, Vec<Vec<usize>>>,
1232    ) -> [u8; 32] {
1233        use std::collections::hash_map::DefaultHasher;
1234        use std::hash::Hasher;
1235
1236        // Simple hash implementation - would be replaced by cryptographic consensus
1237        let mut hasher = DefaultHasher::new();
1238
1239        // Order by node ID for determinism
1240        let mut ordered_nodes: Vec<_> = patterns.keys().collect();
1241        ordered_nodes.sort();
1242
1243        for &node_id in &ordered_nodes {
1244            if let Some(node_patterns) = patterns.get(node_id) {
1245                // Hash node patterns
1246                hasher.write_usize(**node_id);
1247                hasher.write_usize(node_patterns.len());
1248
1249                for pattern in node_patterns {
1250                    hasher.write_usize(pattern.len());
1251                    for &value in pattern {
1252                        hasher.write_usize(value);
1253                    }
1254                }
1255            }
1256        }
1257
1258        // Convert to 32 byte array (simulating SHA-256)
1259        let hash_value = hasher.finish();
1260        let mut result = [0u8; 32];
1261        let bytes = hash_value.to_be_bytes();
1262        result[..bytes.len()].copy_from_slice(&bytes);
1263
1264        result
1265    }
1266    // TODO: integrate with consensus mechanisms & hybrid-multichain models
1267    /// Register a knowledge transfer event
1268    #[allow(dead_code)]
1269    fn register_knowledge_transfer(&mut self, consensus_hash: [u8; 32], node_count: usize) {
1270        // Record the transfer in system metrics
1271        let timestamp = Timestamp::<u64>::now();
1272
1273        // Store in runtime properties
1274        let transfer_key = format!("knowledge_transfer_{timestamp}");
1275        let transfer_value = format!("{:?}:{}", consensus_hash, node_count);
1276
1277        // Store this in the fragment's properties
1278        for node in self.fragment.partitions_mut().values_mut() {
1279            node.store_mut()
1280                .set_property(&transfer_key, &transfer_value);
1281        }
1282
1283        // Log the transfer
1284        #[cfg(feature = "tracing")]
1285        tracing::info!(
1286            "Knowledge transfer completed: {} nodes, consensus: {:?}",
1287            node_count,
1288            &consensus_hash[0..4]
1289        );
1290    }
1291}
1292
1293impl<D> Default for RuntimeInner<D>
1294where
1295    D: Driver<Triad>,
1296{
1297    fn default() -> Self {
1298        Self::new()
1299    }
1300}
1301
1302impl<D> eryon::Execute<usize> for RuntimeInner<D>
1303where
1304    D: TriadDriver,
1305{
1306    type Output = crate::Result;
1307    fn execute(mut self, rhs: usize) -> Self::Output {
1308        self.execute_task(rhs)
1309    }
1310}
1311
1312impl<D> eryon::ExecuteMut<usize> for RuntimeInner<D>
1313where
1314    D: TriadDriver,
1315{
1316    type Output = crate::Result;
1317    fn execute_mut(&mut self, rhs: usize) -> Self::Output {
1318        self.execute_task(rhs)
1319    }
1320}