eryon_rt/runtime/
impl_inner.rs

1/*
2    Appellation: impl_inner <module>
3    Contrib: @FL03
4*/
5use crate::runtime::RuntimeInner;
6
7use crate::actors::prelude::{Driver, TriadDriver, VNode};
8use crate::fragment::FragTonnetz;
9use crate::nrt::Triad;
10use crate::orcha::Orchestrator;
11use crate::scheduler::Scheduler;
12use crate::tasks::{Task, TaskStatus, TaskType, TaskTypeKey};
13use crate::types::{PerformanceTracker, SystemInfo};
14use rshyper::EdgeId;
15use std::collections::HashMap;
16use std::time::{Duration, Instant};
17
18impl<D> RuntimeInner<D>
19where
20    D: Driver<Triad>,
21{
22    /// Create a new runtime
23    pub fn new() -> Self {
24        let fragment = FragTonnetz::new();
25        let orchestrator = Orchestrator::new();
26        let system_info = SystemInfo::new();
27        let perf_tracker = PerformanceTracker::new();
28
29        Self {
30            fragment,
31            orchestrator,
32            scheduler: Scheduler::new(),
33            perf_tracker,
34            system_info,
35        }
36    }
37    /// Get a reference to the fragment
38    pub fn fragment(&self) -> &FragTonnetz<D> {
39        &self.fragment
40    }
41    /// Get a mutable reference to the fragment
42    pub fn fragment_mut(&mut self) -> &mut FragTonnetz<D> {
43        &mut self.fragment
44    }
45    /// Get a reference to the orchestrator
46    pub fn orchestrator(&self) -> &Orchestrator {
47        &self.orchestrator
48    }
49    /// returns a mutable reference to the orchestrator
50    pub fn orchestrator_mut(&mut self) -> &mut Orchestrator {
51        &mut self.orchestrator
52    }
53    /// returns an immutable reference to the performance tracker
54    pub const fn perf_tracker(&self) -> &PerformanceTracker {
55        &self.perf_tracker
56    }
57    /// returns a mutable reference to the performance tracker
58    pub fn perf_tracker_mut(&mut self) -> &mut PerformanceTracker {
59        &mut self.perf_tracker
60    }
61    /// Get a performance trend report
62    pub fn performance_trend_report(&self) -> String {
63        self.perf_tracker.generate_trend_report()
64    }
65    /// returns a reference to the scheduler
66    pub const fn scheduler(&self) -> &Scheduler {
67        &self.scheduler
68    }
69    /// returns a mutable reference to the scheduler
70    pub fn scheduler_mut(&mut self) -> &mut Scheduler {
71        &mut self.scheduler
72    }
73    /// returns an immutable reference to the system's info
74    pub const fn system_info(&self) -> &SystemInfo {
75        &self.system_info
76    }
77    /// returns a mutable reference to the system's info
78    pub fn system_info_mut(&mut self) -> &mut SystemInfo {
79        &mut self.system_info
80    }
81    /// returns a reference to the tasks stored in the scheduler
82    pub fn task_queue(&self) -> &[Task] {
83        &self.scheduler.task_queue
84    }
85    /// returns a mutable reference to the task queue within the scheduler
86    pub fn task_queue_mut(&mut self) -> &mut Vec<Task> {
87        &mut self.scheduler.task_queue
88    }
89    /// returns an immutable reference to the initialized virtual node's
90    pub fn vnodes(&self) -> &HashMap<EdgeId, VNode<D>> {
91        self.fragment.partitions()
92    }
93    /// returns a mutable reference to the initialized virtual nodes
94    pub fn vnodes_mut(&mut self) -> &mut HashMap<EdgeId, VNode<D>> {
95        self.fragment.partitions_mut()
96    }
97    /// set the runtimes fragment to the given value
98    pub fn set_fragment(&mut self, fragment: FragTonnetz<D>) {
99        self.fragment = fragment;
100    }
101    /// set the runtime's orchestrator
102    pub fn set_orchestrator(&mut self, orchestrator: Orchestrator) {
103        self.orchestrator = orchestrator;
104    }
105    /// set the runtime's scheduler
106    pub fn set_scheduler(&mut self, scheduler: Scheduler) {
107        self.scheduler = scheduler;
108    }
109    /// consumes the current instance to create another with the given fragment
110    pub fn with_fragment(self, fragment: FragTonnetz<D>) -> Self {
111        Self { fragment, ..self }
112    }
113    /// consumes the current instance to create another with the given orchestrator
114    pub fn with_orchestrator(self, orchestrator: Orchestrator) -> Self {
115        Self {
116            orchestrator,
117            ..self
118        }
119    }
120    /// consumes the current instance to create another with the given scheduler
121    pub fn with_scheduler(self, scheduler: Scheduler) -> Self {
122        Self { scheduler, ..self }
123    }
124    /// Update performance tracking with current metrics
125    pub fn update_performance_metrics(&mut self) {
126        // Get current metrics
127        let metrics = self.runtime_metrics();
128
129        // Record in the tracker
130        self.perf_tracker.record_metrics(metrics);
131    }
132}
133
134impl<D> RuntimeInner<D>
135where
136    D: Driver<Triad>,
137{
138    /// Adaptive task scheduling based on system resources and task performance history
139    pub fn adaptive_schedule(&mut self) -> crate::Result<()>
140    where
141        D: TriadDriver,
142    {
143        // Track historical execution times by task type discriminant (TaskTypeKey)
144        let mut task_type_metrics = HashMap::new();
145
146        // Get completed tasks from scheduler
147        for completed_task in self.scheduler.completed_tasks() {
148            let execution_time = completed_task
149                .completed_at
150                .unwrap_or_else(Instant::now)
151                .duration_since(
152                    completed_task
153                        .started_at
154                        .unwrap_or(completed_task.created_at),
155                );
156
157            // Use TaskTypeKey for the hashmap key instead of TaskType
158            let key = TaskTypeKey::from(&completed_task.kind);
159
160            let entry = task_type_metrics
161                .entry(key)
162                .or_insert_with(|| (Duration::from_secs(0), 0));
163
164            entry.0 += execution_time;
165            entry.1 += 1;
166        }
167
168        // Calculate average execution times
169        let mut avg_execution_times = HashMap::new();
170        for (key, (total_time, count)) in task_type_metrics {
171            if count > 0 {
172                let avg_time = total_time / count as u32;
173                avg_execution_times.insert(key, avg_time);
174            }
175        }
176
177        // Calculate current system load
178        let system_load = self.system_info.get_current_load();
179        let node_loads = self.fragment.get_node_loads();
180
181        // For each queued task, adjust priority
182        let now = Instant::now();
183        for task in self.task_queue_mut() {
184            if matches!(task.status, TaskStatus::Queued) {
185                let base_priority = task.priority as f32;
186
187                // Factor 1: Expected execution time - use TaskTypeKey
188                let key = TaskTypeKey::from(&task.kind);
189                let time_factor = if let Some(avg_time) = avg_execution_times.get(&key) {
190                    // Normalize: shorter tasks get higher priority
191                    let max_time = Duration::from_secs(60); // Assume 60s max task time
192                    1.0 - (avg_time.as_secs_f32() / max_time.as_secs_f32()).min(1.0)
193                } else {
194                    0.5 // Default for unknown task types
195                };
196
197                // Factor 2: System load
198                let load_factor = 1.0 - system_load;
199
200                // Factor 3: Task age
201                let age = now.duration_since(task.created_at);
202                let age_factor = (age.as_secs_f32() / 60.0).min(1.0); // Max boost after 60 seconds
203
204                // Factor 4: Target node resource availability
205                let node_factor = match &task.kind {
206                    TaskType::Transform { node_id, .. } => {
207                        1.0 - node_loads.get(node_id).copied().unwrap_or(0.5)
208                    }
209                    TaskType::LearnPattern { node_id, .. } => {
210                        1.0 - node_loads.get(node_id).copied().unwrap_or(0.5)
211                    }
212                    _ => 0.5, // Default for tasks not targeting specific nodes
213                };
214
215                // Compute adjusted priority
216                let adjusted_priority = base_priority
217                    * (0.3 * time_factor
218                        + 0.2 * load_factor
219                        + 0.2 * age_factor
220                        + 0.3 * node_factor);
221
222                // Update task's effective priority (but keep original base priority)
223                task.effective_priority = adjusted_priority.round() as usize;
224            }
225        }
226
227        // Sort by effective priority (higher first)
228        self.task_queue_mut()
229            .sort_by(|a, b| b.effective_priority.cmp(&a.effective_priority));
230
231        Ok(())
232    }
233    /// Add a triad to the fragment
234    pub fn add_triad(&mut self, triad: Triad) -> crate::Result<rshyper::EdgeId> {
235        self.fragment.add_triad(triad)
236    }
237    /// Auto-configure the runtime based on available system resources
238    pub fn auto_configure(&mut self) -> crate::Result<()> {
239        // Calculate optimal number of partitions based on CPU cores
240        let optimal_partitions = self.system_info.get_cpu_cores().max(1);
241
242        // If we already have enough partitions, we're good
243        if self.fragment.partition_count() >= optimal_partitions {
244            return Ok(());
245        }
246
247        // Otherwise, initialize with all 24 major and minor triads
248        self.init_complete_layer()
249    }
250
251    pub fn completed_tasks(&self) -> Vec<&Task> {
252        self.scheduler.completed_tasks()
253    }
254    /// Coordinate distributed learning with gradient sharing
255    pub fn coordinate_distributed_learning(&mut self) -> crate::Result<usize>
256    where
257        D: TriadDriver,
258    {
259        // Group nodes by chord class
260        let mut nodes_by_class = std::collections::HashMap::new();
261
262        for (&edge_id, node) in self.fragment.partitions() {
263            if node.has_surface_network() {
264                nodes_by_class
265                    .entry(node.class())
266                    .or_insert_with(Vec::new)
267                    .push(edge_id);
268            }
269        }
270
271        // For each class, coordinate learning
272        for (_class, node_ids) in nodes_by_class {
273            if node_ids.len() <= 1 {
274                continue;
275            }
276
277            // 1. Collect gradients from all nodes
278            let mut all_gradients = Vec::new();
279
280            for &node_id in &node_ids {
281                if let Some(node) = self.fragment.find_node_by_id(node_id) {
282                    let gradients = node.extract_learning_gradients()?;
283                    all_gradients.push((node_id, gradients));
284                }
285            }
286
287            // 2. Average the gradients
288            let mut averaged_gradients = HashMap::new();
289
290            for (_node_id, gradients) in all_gradients {
291                for (pattern_id, gradient) in gradients {
292                    averaged_gradients
293                        .entry(pattern_id)
294                        .or_insert_with(Vec::new)
295                        .push(gradient);
296                }
297            }
298
299            // 3. Apply averaged gradients back to nodes
300            for &node_id in &node_ids {
301                if let Some(node) = self.fragment.find_node_by_id_mut(node_id) {
302                    node.apply_federated_gradients(&averaged_gradients)?;
303                }
304            }
305
306            // 4. Schedule evaluation task
307            self.schedule_task(TaskType::EvaluatePerformance, 8);
308        }
309
310        // Record this coordination activity
311        let task_id = self.schedule_task(TaskType::SharePatterns, 6);
312
313        Ok(task_id)
314    }
315    /// Coordinate knowledge sharing between surface networks
316    pub fn coordinate_knowledge_transfer(&mut self) -> crate::Result<()>
317    where
318        D: TriadDriver,
319    {
320        // 1. Identify knowledge-bearing nodes
321        let nodes_with_knowledge: Vec<_> = self
322            .fragment
323            .partitions()
324            .iter()
325            .filter(|(_, node)| node.has_surface_network() && node.feature_count() > 10)
326            .map(|(&id, _)| id)
327            .collect();
328
329        if nodes_with_knowledge.len() <= 1 {
330            return Ok(()); // Nothing to coordinate
331        }
332
333        // 2. Extract knowledge patterns
334        let mut knowledge_patterns = HashMap::new();
335        for &node_id in &nodes_with_knowledge {
336            if let Some(node) = self.fragment.find_node_by_id(node_id) {
337                // Extract discrete pattern representations
338                match node.extract_knowledge_patterns() {
339                    Ok(patterns) => {
340                        if !patterns.is_empty() {
341                            knowledge_patterns.insert(node_id, patterns);
342                        }
343                    }
344                    Err(_e) => {
345                        #[cfg(feature = "tracing")]
346                        tracing::warn!(
347                            "Failed to extract knowledge from node {}: {:?}",
348                            node_id,
349                            _e
350                        );
351                        continue;
352                    }
353                }
354            }
355        }
356
357        if knowledge_patterns.is_empty() {
358            return Ok(());
359        }
360
361        // 3. Generate a consensus hash for verification
362        let consensus_hash = self.compute_knowledge_consensus_hash(&knowledge_patterns);
363
364        // 4. Calculate all proximities first to avoid borrowing conflicts
365        let mut transfer_data = Vec::new();
366
367        for &target_id in &nodes_with_knowledge {
368            let mut source_pattern_data = Vec::new();
369
370            for (&source_id, patterns) in &knowledge_patterns {
371                if source_id != target_id {
372                    // Calculate proximity in the tonal space
373                    let proximity = if let Some(distance) =
374                        self.fragment.voice_leading_distance(target_id, source_id)
375                    {
376                        1.0 / (1.0 + distance as f32 / 6.0) // Normalize by octave divisions
377                    } else {
378                        0.5 // Default if we can't calculate distance
379                    };
380
381                    // Only integrate if there's significant proximity
382                    if proximity > 0.3 {
383                        // Select patterns to integrate based on proximity
384                        let pattern_count = (patterns.len() as f32 * proximity).ceil() as usize;
385                        if pattern_count > 0 {
386                            let selected_patterns = if pattern_count >= patterns.len() {
387                                patterns.clone()
388                            } else {
389                                // Take most important patterns first
390                                patterns.iter().take(pattern_count).cloned().collect()
391                            };
392
393                            source_pattern_data.push((source_id, selected_patterns));
394                        }
395                    }
396                }
397            }
398
399            // Store all transfer data for this target
400            if !source_pattern_data.is_empty() {
401                transfer_data.push((target_id, source_pattern_data));
402            }
403        }
404
405        // 5. Now perform the actual knowledge transfer without concurrent borrowing
406        for (target_id, source_patterns) in transfer_data {
407            if let Some(target_node) = self.fragment.find_node_by_id_mut(target_id) {
408                // Apply knowledge from sources
409                for (source_id, patterns) in source_patterns {
410                    if let Err(e) = target_node.integrate_external_knowledge(&patterns, source_id) {
411                        #[cfg(feature = "tracing")]
412                        tracing::error!("Failed to integrate knowledge: {:?}", e);
413                        #[cfg(not(feature = "tracing"))]
414                        eprintln!("Failed to integrate knowledge: {:?}", e);
415                    }
416                }
417            }
418        }
419
420        // 6. Register the knowledge transfer event
421        self.register_knowledge_transfer(consensus_hash, nodes_with_knowledge.len());
422
423        Ok(())
424    }
425    /// coordinate learning between nodes
426    pub fn coordinate_learning(&mut self) -> crate::Result<()>
427    where
428        D: TriadDriver,
429    {
430        // 1. Identify knowledge-bearing nodes
431        let nodes_with_knowledge: Vec<_> = self
432            .fragment
433            .partitions()
434            .iter()
435            .filter(|(_, node)| node.has_surface_network() && node.feature_count() > 0)
436            .map(|(&id, _)| id)
437            .collect();
438
439        if nodes_with_knowledge.len() <= 1 {
440            return Ok(()); // Nothing to coordinate
441        }
442
443        // 2. Extract patterns once per node - this is our main optimization
444        let mut extracted_patterns = Vec::with_capacity(nodes_with_knowledge.len());
445
446        for &node_id in &nodes_with_knowledge {
447            if let Some(node) = self.find_node(node_id) {
448                match node.extract_knowledge_patterns() {
449                    Ok(patterns) if !patterns.is_empty() => {
450                        extracted_patterns.push((node_id, patterns));
451                    }
452                    _ => continue,
453                }
454            }
455        }
456
457        // 3. Use a constant limit on targets per source to control complexity
458        let max_targets_per_source = std::cmp::min(5, nodes_with_knowledge.len() - 1);
459
460        // 4. Perform transfers using the extracted patterns
461        for (source_id, patterns) in extracted_patterns {
462            // Skip empty pattern sets
463            if patterns.is_empty() {
464                continue;
465            }
466
467            // Choose target nodes (limited number)
468            let mut target_count = 0;
469
470            for &target_id in &nodes_with_knowledge {
471                if target_id != source_id && target_count < max_targets_per_source {
472                    if let Some(target_node) = self.find_node_mut(target_id) {
473                        if let Err(_e) =
474                            target_node.integrate_external_knowledge(&patterns, source_id)
475                        {
476                            #[cfg(feature = "tracing")]
477                            tracing::error!("Failed to integrate knowledge: {_e:?}");
478                            #[cfg(not(feature = "tracing"))]
479                            eprintln!("Failed to integrate knowledge: {_e:?}");
480                        }
481
482                        target_count += 1;
483                    }
484                }
485            }
486        }
487
488        // 5. Record coordination event
489        for node in self.fragment_mut().partitions_mut().values_mut() {
490            node.memory_mut().record_event(
491                "knowledge_coordination",
492                Some(vec![nodes_with_knowledge.len()]),
493            );
494        }
495
496        Ok(())
497    }
498    /// executes the task associated with the given identitifier
499    pub fn execute_task(&mut self, task_id: usize) -> crate::Result<()>
500    where
501        D: TriadDriver,
502    {
503        let task = self
504            .scheduler
505            .get_task(task_id)
506            .ok_or_else(|| crate::RuntimeError::TaskNotFound(task_id))?;
507        self.orchestrator.execute_task(&mut self.fragment, task)
508    }
509    /// Execute tasks in the queue
510    pub fn execute_tasks(&mut self) -> crate::Result<()>
511    where
512        D: TriadDriver,
513    {
514        // Use the scheduler to process tasks
515        let _res = self
516            .scheduler
517            .process_batch(&mut self.fragment, &mut self.orchestrator)?;
518        Ok(())
519    }
520    /// returns an immutable reference to the node with the given identifier; if any
521    pub fn find_node(&self, id: rshyper::Index) -> Option<&VNode<D>> {
522        self.fragment().find_node_by_id(id)
523    }
524    /// returns a mutable reference to the partition with the given identifier; if any
525    pub fn find_node_mut(&mut self, id: rshyper::Index) -> Option<&mut VNode<D>> {
526        self.fragment_mut().find_node_by_id_mut(id)
527    }
528    /// Initialize the runtime with major and minor triads
529    pub fn init_complete_layer(&mut self) -> crate::Result<()> {
530        // Initialize the fragment with all triads
531        self.fragment.initialize_with_all_triads()?;
532
533        // Initialize the orchestrator
534        self.orchestrator.initialize(&self.fragment);
535
536        Ok(())
537    }
538    /// Initialize surface networks for all nodes in the fragment
539    pub fn init_node_surfaces(&mut self) -> crate::Result<()>
540    where
541        D: TriadDriver,
542    {
543        let mut initialized_count = 0;
544
545        // Iterate through all partitions
546        for (_, node) in self.fragment.partitions_mut() {
547            // Skip nodes that already have a surface network
548            if node.has_surface_network() {
549                continue;
550            }
551
552            // Initialize the surface network for this node
553            node.init_surface()?;
554
555            // Track how many we've initialized
556            initialized_count += 1;
557        }
558
559        if initialized_count > 0 {
560            #[cfg(feature = "tracing")]
561            tracing::debug!("Initialized {} node surface networks", initialized_count);
562        }
563
564        Ok(())
565    }
566    /// Initialize surface networks with class-specific tuning
567    pub fn init_specialized_surfaces(&mut self) -> crate::Result<()>
568    where
569        D: TriadDriver,
570    {
571        use crate::nrt::Triads::*;
572        // Group nodes by chord class
573        let mut nodes_by_class = std::collections::HashMap::new();
574
575        for (&edge_id, node) in self.fragment.partitions() {
576            nodes_by_class
577                .entry(node.class())
578                .or_insert_with(Vec::new)
579                .push(edge_id);
580        }
581
582        // Parameters optimized for each chord class
583        let class_params: HashMap<_, _> = [
584            // Major triads: More emphasis on dominant transformations
585            (Major, (0.02, 0.6, 0.4)),
586            // Minor triads: More sensitivity to subdominant transformations
587            (Minor, (0.01, 0.8, 0.5)),
588            // Diminished triads: Higher resolution for chromatic movement
589            (Diminished, (0.04, 0.7, 0.6)),
590            // Augmented triads: Focus on symmetrical properties
591            (Augmented, (0.05, 0.5, 0.8)),
592        ]
593        .into_iter()
594        .collect();
595
596        // Apply specialized parameters for each node
597        for (class, node_ids) in &nodes_by_class {
598            let params = class_params.get(&class).unwrap_or(&(0.01, 0.7, 0.5));
599
600            for &edge_id in node_ids {
601                if let Some(node) = self.fragment.find_node_by_id_mut(edge_id) {
602                    if !node.has_surface_network() {
603                        let config = crate::actors::surface::SurfaceModelConfig::new(
604                            params.0, params.1, params.2,
605                        );
606                        // Initialize with class-specific parameters
607                        node.surface_mut().map(|i| i.model_mut().set_config(config));
608                    }
609                }
610            }
611        }
612
613        #[cfg(feature = "tracing")]
614        tracing::debug!(
615            "Initialized specialized surface networks for {} classes",
616            nodes_by_class.len()
617        );
618
619        Ok(())
620    }
621    /// Optimize memory allocation across nodes
622    pub fn optimize_memory_allocation(&mut self) -> crate::Result<()>
623    where
624        D: TriadDriver,
625    {
626        // Get current memory usage per node
627        let mut node_memory_usage = HashMap::new();
628        let mut total_features = 0;
629
630        for (&edge_id, node) in self.fragment.partitions() {
631            let feature_count = node.feature_count();
632            node_memory_usage.insert(edge_id, feature_count);
633            total_features += feature_count;
634        }
635
636        // Calculate average memory usage
637        if self.fragment.partition_count() == 0 {
638            return Ok(());
639        }
640
641        let avg_features = total_features / self.fragment.partition_count();
642        let target_max = (avg_features as f32 * 1.5) as usize; // Allow 50% above average
643
644        // Find nodes with excessive features
645        let mut nodes_to_optimize = Vec::new();
646        for (&edge_id, &feature_count) in &node_memory_usage {
647            if feature_count > target_max {
648                nodes_to_optimize.push((edge_id, feature_count));
649            }
650        }
651
652        // Sort by feature count (highest first)
653        nodes_to_optimize.sort_by(|a, b| b.1.cmp(&a.1));
654
655        // Optimize each node
656        for (edge_id, _) in nodes_to_optimize {
657            if let Some(node) = self.fragment.find_node_by_id_mut(edge_id) {
658                // Consolidate patterns with similar features
659                node.consolidate_similar_patterns(0.8)?;
660
661                // Prune low-importance features
662                node.prune_low_importance_features(0.2)?;
663            }
664        }
665
666        Ok(())
667    }
668    /// Optimize the surface networks across all nodes
669    pub fn optimize_surface_networks(&mut self) -> crate::Result<()>
670    where
671        D: TriadDriver,
672    {
673        // Group nodes by chord class
674        let mut nodes_by_class = std::collections::HashMap::new();
675
676        // Analyze all nodes and group by chord class
677        for (&edge_id, node) in self.fragment.partitions() {
678            nodes_by_class
679                .entry(node.class())
680                .or_insert_with(Vec::new)
681                .push(edge_id);
682        }
683
684        // For each class, find best-performing node and share patterns
685        for (_class, node_ids) in nodes_by_class {
686            if node_ids.len() <= 1 {
687                continue;
688            }
689
690            // Find node with most learning
691            let mut best_node_id = None;
692            let mut most_features = 0;
693
694            for &id in &node_ids {
695                if let Some(node) = self.fragment.find_node_by_id(id) {
696                    let features = node.feature_count();
697                    if features > most_features && node.has_surface_network() {
698                        best_node_id = Some(id);
699                        most_features = features;
700                    }
701                }
702            }
703
704            // Share patterns from best node to others
705            if let Some(source_id) = best_node_id {
706                for &target_id in &node_ids {
707                    if target_id != source_id {
708                        // Create message to share patterns
709                        self.orchestrator
710                            .send_message(crate::NodeMessage::PatternShare {
711                                source: source_id,
712                                pattern: vec![], // Will be populated by the node
713                                importance: 0.8,
714                            });
715                    }
716                }
717            }
718        }
719
720        // Process the messages
721        self.process_messages()?;
722
723        Ok(())
724    }
725    /// Get predicted resource requirements for a task
726    pub fn predict_task_resources(&self, task_type: &TaskType) -> (f32, f32) {
727        // Use discriminant/TaskTypeKey to avoid floating-point issues in pattern matching
728        match TaskTypeKey::from(task_type) {
729            TaskTypeKey::Transform => (5.0, 50.0),
730            TaskTypeKey::LearnPattern => (20.0, 100.0),
731            TaskTypeKey::SharePatterns => (10.0, 80.0),
732            TaskTypeKey::OptimizeNode => (15.0, 30.0),
733            _ => (5.0, 20.0), // Default for unknown tasks
734        }
735    }
736    /// Schedule tasks using predictive resource management
737    pub fn predictive_schedule(&mut self, look_ahead_ms: u64) -> crate::Result<()>
738    where
739        D: TriadDriver,
740    {
741        // 1. Get current system state
742        let current_cpu = self.system_info.get_cpu_usage();
743        let current_memory = self.system_info.get_ram_memory_usage();
744        let available_memory =
745            self.system_info.get_free_memory_mb() as f32 * (1.0 - current_memory);
746
747        // 2. Estimate resource availability window
748        // TODO: find a way to use these values
749        let _cpu_headroom = 1.0 - current_cpu;
750        let _mem_headroom = available_memory;
751
752        // 3. Build resource timeline for currently executing tasks
753        let mut cpu_timeline = vec![current_cpu];
754        let mut mem_timeline = vec![current_memory];
755        let timeline_slots = 10; // Divide look-ahead into slots
756
757        // Get currently running tasks
758        let running_tasks = self.scheduler.get_running_tasks();
759
760        // Estimate when each task will complete and free resources
761        for slot in 1..timeline_slots {
762            let slot_time = Instant::now()
763                + Duration::from_millis(look_ahead_ms * slot as u64 / timeline_slots as u64);
764
765            // Start with current usage
766            let mut cpu_usage = current_cpu;
767            let mut mem_usage = current_memory;
768
769            // Adjust for tasks expected to complete by this time
770            for task in &running_tasks {
771                let task_started = task.started_at.unwrap_or(task.created_at);
772                let (task_cpu, task_mem) = self.predict_task_resources(&task.kind);
773
774                // Estimate task duration based on task type discriminant
775                let est_duration = {
776                    let task_key = TaskTypeKey::from(&task.kind);
777                    self.scheduler.estimate_task_duration(task_key)
778                };
779                let expected_completion = task_started + est_duration;
780
781                // If task still running at this slot, count its resources
782                if expected_completion > slot_time {
783                    cpu_usage += task_cpu / 100.0;
784                    mem_usage += task_mem / self.system_info.get_free_memory_mb() as f32;
785                }
786            }
787
788            // Cap at reasonable values
789            cpu_usage = cpu_usage.min(1.0).max(0.1);
790            mem_usage = mem_usage.min(1.0).max(0.1);
791
792            cpu_timeline.push(cpu_usage);
793            mem_timeline.push(mem_usage);
794        }
795
796        // 4. Find optimal scheduling slot for each queued task
797        let mut scheduled_tasks = Vec::new();
798
799        for task in self.task_queue() {
800            if !matches!(task.status, TaskStatus::Queued) {
801                continue;
802            }
803
804            let (task_cpu, task_mem) = self.predict_task_resources(&task.kind);
805            let normalized_cpu = task_cpu / 100.0;
806            let normalized_mem = task_mem / self.system_info.get_free_memory_mb() as f32;
807
808            // Find the best slot to schedule this task
809            let mut best_slot = 0;
810            let mut best_fit = f32::MAX;
811
812            for slot in 0..timeline_slots {
813                // Check if resources available in this slot
814                let slot_cpu = cpu_timeline[slot];
815                let slot_mem = mem_timeline[slot];
816
817                if slot_cpu + normalized_cpu <= 1.0 && slot_mem + normalized_mem <= 1.0 {
818                    // Calculate how well the task fits this slot (smaller is better)
819                    let fit = normalized_cpu / (1.0 - slot_cpu) + normalized_mem / (1.0 - slot_mem);
820
821                    if fit < best_fit {
822                        best_fit = fit;
823                        best_slot = slot;
824                    }
825                }
826            }
827
828            // If we found a slot, schedule task with delay
829            if best_fit < f32::MAX {
830                let delay_ms = look_ahead_ms * best_slot as u64 / timeline_slots as u64;
831
832                // Add task to the scheduled list with its delay
833                scheduled_tasks.push((task.id, delay_ms));
834
835                // Update resource timeline
836                if best_slot < timeline_slots {
837                    for slot in best_slot..timeline_slots {
838                        cpu_timeline[slot] += normalized_cpu;
839                        mem_timeline[slot] += normalized_mem;
840                    }
841                }
842            }
843        }
844
845        // 5. Apply scheduling delays
846        for (task_id, delay_ms) in scheduled_tasks {
847            if let Some(task) = self.task_queue_mut().iter_mut().find(|t| t.id == task_id) {
848                task.scheduled_delay = Some(Duration::from_millis(delay_ms));
849            }
850        }
851
852        Ok(())
853    }
854    /// Prioritize tasks based on observer position in Tonnetz space
855    pub fn prioritize_tonnetz_tasks(&mut self) -> crate::Result<()>
856    where
857        D: TriadDriver,
858    {
859        // 1. Find observer position (a node with Observer role)
860        let mut observer_position = None;
861
862        for (&edge_id, node) in self.fragment.partitions() {
863            if matches!(node.operator(), crate::actors::Operator::Observer(_)) {
864                observer_position = Some(edge_id);
865                break;
866            }
867        }
868
869        // If no observer found, use any node
870        let observer_position = observer_position.unwrap_or_else(|| {
871            self.fragment
872                .partitions()
873                .keys()
874                .next()
875                .copied()
876                .unwrap_or_default()
877        });
878
879        // 2. Collect tasks and their affected nodes first
880        let mut task_adjustments = Vec::new();
881
882        for (idx, task) in self.task_queue().iter().enumerate() {
883            if !matches!(task.status, TaskStatus::Queued) {
884                continue;
885            }
886
887            // Get the node ID affected by this task
888            let affected_node = match &task.kind {
889                TaskType::Transform { node_id, .. } => Some(*node_id),
890                TaskType::OptimizeNode { node_id, .. } => Some(*node_id),
891                TaskType::ChangeNodeMode(node_id, _) => Some(*node_id),
892                TaskType::LearnPattern { node_id, .. } => Some(*node_id),
893                TaskType::EvaluateEnsemble { node_id, .. } => Some(*node_id),
894                _ => None, // Tasks that don't target specific nodes
895            };
896
897            if let Some(node_id) = affected_node {
898                // Calculate voice-leading distance between observer and affected node
899                if let Some(distance) = self.voice_leading_distance(observer_position, node_id) {
900                    // Convert to octave-based distance factor
901                    let octave_layer = distance as f32 / 12.0;
902                    let distance_factor = (1.0 + octave_layer).recip();
903
904                    // Calculate adjusted priority
905                    let base_priority = task.priority as f32;
906                    let new_priority = (base_priority * distance_factor).round() as usize;
907
908                    // Store index and new priority for later application
909                    task_adjustments.push((idx, new_priority));
910                }
911            }
912        }
913
914        // 3. Apply the priority adjustments after computation is done
915        for (idx, priority) in task_adjustments {
916            if let Some(task) = self.task_queue_mut().get_mut(idx) {
917                task.effective_priority = priority;
918            }
919        }
920
921        // 4. Re-sort task queue
922        self.task_queue_mut()
923            .sort_by(|a, b| b.effective_priority.cmp(&a.effective_priority));
924
925        Ok(())
926    }
927    /// Process messages between nodes
928    pub fn process_messages(&mut self) -> crate::Result<()>
929    where
930        D: TriadDriver,
931    {
932        self.orchestrator.process_messages(&mut self.fragment)
933    }
934    /// Run maintenance including knowledge coordination
935    pub fn run_maintenance(&mut self) -> crate::Result<Vec<usize>>
936    where
937        D: TriadDriver,
938    {
939        let mut task_ids = Vec::new();
940
941        // 1. Memory optimization
942        let memory_task_id = self.schedule_task(
943            TaskType::OptimizeMemory { max_features: 5000 },
944            3, // Low priority
945        );
946        task_ids.push(memory_task_id);
947
948        // 2. Knowledge transfer (higher priority than memory optimization)
949        let knowledge_task_id = self.schedule_knowledge_transfer(6);
950        task_ids.push(knowledge_task_id);
951
952        // 3. Resource balancing
953        let balance_task_id = self.schedule_task(
954            TaskType::BalanceResources,
955            4, // Medium priority
956        );
957        task_ids.push(balance_task_id);
958
959        Ok(task_ids)
960    }
961    /// Get detailed resource metrics
962    pub fn runtime_metrics(&self) -> crate::ResourceMetrics {
963        crate::ResourceMetrics::default()
964            .with_cpu_usage(self.system_info.get_cpu_usage())
965            .with_memory_usage(self.system_info.memory_usage())
966            .with_available_memory(self.system_info.get_free_memory_mb())
967            .with_partition_count(self.fragment.partition_count())
968            .with_edge_count(self.fragment.edge_count())
969            .with_vertex_count(self.fragment.vertex_count())
970            .with_queued_tasks(self.task_queue().len())
971            .with_running_tasks(self.orchestrator.active_task_count())
972            .with_learning_nodes(
973                self.fragment
974                    .partitions()
975                    .values()
976                    .filter(|node| node.has_surface_network())
977                    .count(),
978            )
979            .with_feature_count(
980                self.fragment
981                    .partitions()
982                    .values()
983                    .map(|node| node.feature_count())
984                    .sum::<usize>(),
985            )
986    }
987    /// Schedule a knowledge transfer task with the given priority
988    pub fn schedule_knowledge_transfer(&mut self, priority: usize) -> usize {
989        // Schedule a task for the next step
990        self.schedule_task(TaskType::CoordinateLearning, priority)
991    }
992    /// Schedule memory optimization based on current memory usage
993    pub fn schedule_memory_optimization(&mut self) -> crate::Result<usize> {
994        // Calculate total features across the fragment
995        let total_features: usize = self
996            .fragment
997            .partitions()
998            .values()
999            .map(|node| node.feature_count())
1000            .sum();
1001
1002        // Get node count for average calculation
1003        let node_count = self.fragment.partitions().len().max(1);
1004
1005        // Calculate average features per node
1006        let avg_features_per_node = total_features / node_count;
1007
1008        // Set target max features - higher for busy fragments
1009        let max_features = if avg_features_per_node > 1000 {
1010            // Heavy memory usage - be more aggressive
1011            avg_features_per_node * 2 / 3
1012        } else {
1013            // Normal usage - be conservative
1014            avg_features_per_node * 3 / 4
1015        }
1016        .max(500); // Ensure reasonable minimum
1017
1018        // Schedule the optimization task with appropriate priority
1019        let priority = if avg_features_per_node > 2000 {
1020            // High memory pressure - higher priority
1021            7
1022        } else {
1023            // Normal memory pressure - standard maintenance priority
1024            3
1025        };
1026
1027        // Schedule the task
1028        let task_id = self.schedule_task(TaskType::OptimizeMemory { max_features }, priority);
1029        Ok(task_id)
1030    }
1031    /// Schedule a task for execution
1032    pub fn schedule_task(&mut self, task_type: TaskType, priority: usize) -> usize {
1033        // Use the scheduler to create and queue the task
1034        self.scheduler.create_task(task_type, priority)
1035    }
1036    /// Generate a comprehensive system health report with performance insights
1037    pub fn system_health_report(&self) -> String {
1038        let metrics = self.runtime_metrics();
1039        let info = self.system_info();
1040        let timestamp = eryon::Timestamp::<u64>::now_in_secs();
1041
1042        let mut report = String::new();
1043
1044        // Header with timestamp
1045        report.push_str(&format!("System Health Report - {}\n", timestamp));
1046        report.push_str("==================================\n\n");
1047
1048        // System resources section
1049        report.push_str("SYSTEM RESOURCES\n");
1050        report.push_str("-----------------\n");
1051        report.push_str(&format!("CPU Cores:         {}\n", info.get_cpu_cores()));
1052        report.push_str(&format!(
1053            "Memory:            {} MB\n",
1054            info.get_free_memory_mb()
1055        ));
1056        report.push_str(&format!(
1057            "CPU Usage:         {:.1}%\n",
1058            metrics.cpu_usage() * 100.0
1059        ));
1060        report.push_str(&format!(
1061            "Memory Usage:      {:.1}%\n",
1062            metrics.memory_usage() * 100.0
1063        ));
1064        report.push_str(&format!(
1065            "Available Compute: {}%\n\n",
1066            metrics.available_compute()
1067        ));
1068
1069        // Fragment topology section
1070        report.push_str("TONNETZ TOPOLOGY\n");
1071        report.push_str("----------------\n");
1072        report.push_str(&format!(
1073            "Partitions:        {}\n",
1074            metrics.partition_count()
1075        ));
1076        report.push_str(&format!("Vertices:          {}\n", metrics.vertex_count()));
1077        report.push_str(&format!("Edges:             {}\n\n", metrics.edge_count()));
1078
1079        // Runtime activity section
1080        report.push_str("RUNTIME ACTIVITY\n");
1081        report.push_str("----------------\n");
1082        report.push_str(&format!("Queued Tasks:      {}\n", metrics.queued_tasks()));
1083        report.push_str(&format!("Running Tasks:     {}\n", metrics.running_tasks()));
1084
1085        // Get task distribution
1086        let mut task_counts = std::collections::HashMap::new();
1087        for task in self.task_queue() {
1088            let key = TaskTypeKey::from(&task.kind);
1089            *task_counts.entry(key).or_insert(0) += 1;
1090        }
1091
1092        // Show task distribution if there are any tasks
1093        if !task_counts.is_empty() {
1094            report.push_str("Task Distribution:\n");
1095            for (task_type, count) in task_counts {
1096                report.push_str(&format!("  - {}: {}\n", task_type, count));
1097            }
1098        }
1099        report.push_str("\n");
1100
1101        // Learning status
1102        report.push_str("LEARNING STATUS\n");
1103        report.push_str("--------------\n");
1104        report.push_str(&format!(
1105            "Learning Nodes:     {}\n",
1106            metrics.learning_nodes()
1107        ));
1108        report.push_str(&format!(
1109            "Surface Networks:   {}\n",
1110            metrics.surface_network_count()
1111        ));
1112        report.push_str(&format!(
1113            "Features:          {}\n\n",
1114            metrics.feature_count()
1115        ));
1116
1117        // Capabilities
1118        report.push_str("CAPABILITIES\n");
1119        report.push_str("-----------\n");
1120        for capability in metrics.capabilities() {
1121            report.push_str(&format!("- {}\n", capability));
1122        }
1123        report.push_str("\n");
1124
1125        // Performance assessment
1126        report.push_str("PERFORMANCE ASSESSMENT\n");
1127        report.push_str("---------------------\n");
1128
1129        // Memory pressure assessment
1130        let memory_ratio =
1131            (metrics.feature_count() as f32) / (1000.0 * metrics.partition_count().max(1) as f32);
1132        let memory_status = if memory_ratio > 2.0 {
1133            "HIGH - Consider memory optimization"
1134        } else if memory_ratio > 1.0 {
1135            "MODERATE - Monitor memory growth"
1136        } else {
1137            "GOOD - Memory usage is acceptable"
1138        };
1139        report.push_str(&format!("Memory Pressure:    {}\n", memory_status));
1140
1141        // CPU pressure assessment
1142        let cpu_status = if metrics.cpu_usage() > 0.8 {
1143            "HIGH - Consider scaling out"
1144        } else if metrics.cpu_usage() > 0.5 {
1145            "MODERATE - Monitor CPU usage"
1146        } else {
1147            "GOOD - CPU usage is acceptable"
1148        };
1149        report.push_str(&format!("CPU Pressure:       {}\n", cpu_status));
1150
1151        // Task backlog assessment
1152        let task_ratio = metrics.queued_tasks() as f32 / (info.get_cpu_cores() * 2) as f32;
1153        let task_status = if task_ratio > 3.0 {
1154            "HIGH - Large task backlog"
1155        } else if task_ratio > 1.0 {
1156            "MODERATE - Tasks may experience delays"
1157        } else {
1158            "GOOD - Task processing is timely"
1159        };
1160        report.push_str(&format!("Task Backlog:       {}\n", task_status));
1161
1162        report
1163    }
1164    /// Tune surface network parameters based on node performance
1165    pub fn tune_surface_networks(&mut self) -> crate::Result<()>
1166    where
1167        D: TriadDriver,
1168    {
1169        // Analyze performance of existing surface networks
1170        let mut performance_metrics = HashMap::new();
1171
1172        for (&edge_id, node) in self.fragment.partitions() {
1173            if node.has_surface_network() {
1174                // Get learning metrics for this node
1175                let accuracy = node.get_learning_accuracy();
1176                let convergence_rate = node.get_convergence_rate();
1177                let feature_quality = node.get_feature_quality();
1178
1179                // Combined performance score
1180                let performance = accuracy * 0.5 + convergence_rate * 0.3 + feature_quality * 0.2;
1181
1182                performance_metrics.insert(edge_id, performance);
1183            }
1184        }
1185
1186        // Find best and worst performing nodes
1187        if performance_metrics.is_empty() {
1188            return Ok(());
1189        }
1190
1191        let mut entries: Vec<_> = performance_metrics.iter().collect();
1192        entries.sort_by(|a, b| b.1.partial_cmp(a.1).unwrap_or(std::cmp::Ordering::Equal));
1193
1194        let best_node_id = *entries.first().unwrap().0;
1195
1196        // Get parameters from best node
1197        if let Some(best_node) = self.fragment.find_node_by_id(best_node_id) {
1198            let optimal_params = best_node.get_surface_parameters();
1199
1200            // Apply to lower performing nodes
1201            for (edge_id, performance) in performance_metrics {
1202                // Only adjust nodes with below-average performance
1203                if performance < 0.7 && edge_id != best_node_id {
1204                    if let Some(node) = self.fragment.find_node_by_id_mut(edge_id) {
1205                        // Adjust toward optimal parameters
1206                        node.adjust_surface_parameters(&optimal_params, 0.3)?;
1207                    }
1208                }
1209            }
1210        }
1211
1212        Ok(())
1213    }
1214    /// compute the vocal leading distance between two facets; see
1215    /// [voice_leading_distance](crate::FragTonnetz::voice_leading_distance) for more information
1216    pub fn voice_leading_distance(&self, from: EdgeId, to: EdgeId) -> Option<usize> {
1217        self.fragment.voice_leading_distance(from, to)
1218    }
1219}
1220
1221impl<D> RuntimeInner<D>
1222where
1223    D: Driver<Triad>,
1224{
1225    /// Compute a hash representing the consensus state of knowledge
1226    #[doc(hidden)]
1227    pub fn compute_knowledge_consensus_hash(
1228        &self,
1229        patterns: &HashMap<EdgeId, Vec<Vec<usize>>>,
1230    ) -> [u8; 32] {
1231        use std::collections::hash_map::DefaultHasher;
1232        use std::hash::Hasher;
1233
1234        // Simple hash implementation - would be replaced by cryptographic consensus
1235        let mut hasher = DefaultHasher::new();
1236
1237        // Order by node ID for determinism
1238        let mut ordered_nodes: Vec<_> = patterns.keys().collect();
1239        ordered_nodes.sort();
1240
1241        for &node_id in &ordered_nodes {
1242            if let Some(node_patterns) = patterns.get(node_id) {
1243                // Hash node patterns
1244                hasher.write_usize(**node_id);
1245                hasher.write_usize(node_patterns.len());
1246
1247                for pattern in node_patterns {
1248                    hasher.write_usize(pattern.len());
1249                    for &value in pattern {
1250                        hasher.write_usize(value);
1251                    }
1252                }
1253            }
1254        }
1255
1256        // Convert to 32 byte array (simulating SHA-256)
1257        let hash_value = hasher.finish();
1258        let mut result = [0u8; 32];
1259        let bytes = hash_value.to_be_bytes();
1260        result[..bytes.len()].copy_from_slice(&bytes);
1261
1262        result
1263    }
1264    // TODO: integrate with consensus mechanisms & hybrid-multichain models
1265    /// Register a knowledge transfer event
1266    #[allow(dead_code)]
1267    fn register_knowledge_transfer(&mut self, consensus_hash: [u8; 32], node_count: usize) {
1268        // Record the transfer in system metrics
1269        let timestamp = eryon::Timestamp::<u64>::now_in_secs();
1270
1271        // Store in runtime properties
1272        let transfer_key = format!("knowledge_transfer_{timestamp}");
1273        let transfer_value = format!("{:?}:{}", consensus_hash, node_count);
1274
1275        // Store this in the fragment's properties
1276        for node in self.fragment.partitions_mut().values_mut() {
1277            node.memory_mut()
1278                .set_property(&transfer_key, &transfer_value);
1279        }
1280
1281        // Log the transfer
1282        #[cfg(feature = "tracing")]
1283        tracing::info!(
1284            "Knowledge transfer completed: {} nodes, consensus: {:?}",
1285            node_count,
1286            &consensus_hash[0..4]
1287        );
1288    }
1289}
1290
1291impl<D> Default for RuntimeInner<D>
1292where
1293    D: Driver<Triad>,
1294{
1295    fn default() -> Self {
1296        Self::new()
1297    }
1298}
1299
1300impl<D> eryon::Execute<usize> for RuntimeInner<D>
1301where
1302    D: TriadDriver,
1303{
1304    type Output = crate::Result;
1305    fn execute(mut self, rhs: usize) -> Self::Output {
1306        self.execute_task(rhs)
1307    }
1308}
1309
1310impl<D> eryon::ExecuteMut<usize> for RuntimeInner<D>
1311where
1312    D: TriadDriver,
1313{
1314    type Output = crate::Result;
1315    fn execute_mut(&mut self, rhs: usize) -> Self::Output {
1316        self.execute_task(rhs)
1317    }
1318}