1use crate::runtime::RuntimeInner;
6
7use crate::actors::prelude::{Driver, TriadDriver, VNode};
8use crate::fragment::FragTonnetz;
9use crate::nrt::Triad;
10use crate::orcha::Orchestrator;
11use crate::scheduler::Scheduler;
12use crate::tasks::{Task, TaskStatus, TaskType, TaskTypeKey};
13use crate::types::{PerformanceTracker, SystemInfo};
14use rshyper::EdgeId;
15use std::collections::HashMap;
16use std::time::{Duration, Instant};
17
18impl<D> RuntimeInner<D>
19where
20 D: Driver<Triad>,
21{
22 pub fn new() -> Self {
24 let fragment = FragTonnetz::new();
25 let orchestrator = Orchestrator::new();
26 let system_info = SystemInfo::new();
27 let perf_tracker = PerformanceTracker::new();
28
29 Self {
30 fragment,
31 orchestrator,
32 scheduler: Scheduler::new(),
33 perf_tracker,
34 system_info,
35 }
36 }
37 pub fn fragment(&self) -> &FragTonnetz<D> {
39 &self.fragment
40 }
41 pub fn fragment_mut(&mut self) -> &mut FragTonnetz<D> {
43 &mut self.fragment
44 }
45 pub fn orchestrator(&self) -> &Orchestrator {
47 &self.orchestrator
48 }
49 pub fn orchestrator_mut(&mut self) -> &mut Orchestrator {
51 &mut self.orchestrator
52 }
53 pub const fn perf_tracker(&self) -> &PerformanceTracker {
55 &self.perf_tracker
56 }
57 pub fn perf_tracker_mut(&mut self) -> &mut PerformanceTracker {
59 &mut self.perf_tracker
60 }
61 pub fn performance_trend_report(&self) -> String {
63 self.perf_tracker.generate_trend_report()
64 }
65 pub const fn scheduler(&self) -> &Scheduler {
67 &self.scheduler
68 }
69 pub fn scheduler_mut(&mut self) -> &mut Scheduler {
71 &mut self.scheduler
72 }
73 pub const fn system_info(&self) -> &SystemInfo {
75 &self.system_info
76 }
77 pub fn system_info_mut(&mut self) -> &mut SystemInfo {
79 &mut self.system_info
80 }
81 pub fn task_queue(&self) -> &[Task] {
83 &self.scheduler.task_queue
84 }
85 pub fn task_queue_mut(&mut self) -> &mut Vec<Task> {
87 &mut self.scheduler.task_queue
88 }
89 pub fn vnodes(&self) -> &HashMap<EdgeId, VNode<D>> {
91 self.fragment.partitions()
92 }
93 pub fn vnodes_mut(&mut self) -> &mut HashMap<EdgeId, VNode<D>> {
95 self.fragment.partitions_mut()
96 }
97 pub fn set_fragment(&mut self, fragment: FragTonnetz<D>) {
99 self.fragment = fragment;
100 }
101 pub fn set_orchestrator(&mut self, orchestrator: Orchestrator) {
103 self.orchestrator = orchestrator;
104 }
105 pub fn set_scheduler(&mut self, scheduler: Scheduler) {
107 self.scheduler = scheduler;
108 }
109 pub fn with_fragment(self, fragment: FragTonnetz<D>) -> Self {
111 Self { fragment, ..self }
112 }
113 pub fn with_orchestrator(self, orchestrator: Orchestrator) -> Self {
115 Self {
116 orchestrator,
117 ..self
118 }
119 }
120 pub fn with_scheduler(self, scheduler: Scheduler) -> Self {
122 Self { scheduler, ..self }
123 }
124 pub fn update_performance_metrics(&mut self) {
126 let metrics = self.runtime_metrics();
128
129 self.perf_tracker.record_metrics(metrics);
131 }
132}
133
134impl<D> RuntimeInner<D>
135where
136 D: Driver<Triad>,
137{
138 pub fn adaptive_schedule(&mut self) -> crate::Result<()>
140 where
141 D: TriadDriver,
142 {
143 let mut task_type_metrics = HashMap::new();
145
146 for completed_task in self.scheduler.completed_tasks() {
148 let execution_time = completed_task
149 .completed_at
150 .unwrap_or_else(Instant::now)
151 .duration_since(
152 completed_task
153 .started_at
154 .unwrap_or(completed_task.created_at),
155 );
156
157 let key = TaskTypeKey::from(&completed_task.kind);
159
160 let entry = task_type_metrics
161 .entry(key)
162 .or_insert_with(|| (Duration::from_secs(0), 0));
163
164 entry.0 += execution_time;
165 entry.1 += 1;
166 }
167
168 let mut avg_execution_times = HashMap::new();
170 for (key, (total_time, count)) in task_type_metrics {
171 if count > 0 {
172 let avg_time = total_time / count as u32;
173 avg_execution_times.insert(key, avg_time);
174 }
175 }
176
177 let system_load = self.system_info.get_current_load();
179 let node_loads = self.fragment.get_node_loads();
180
181 let now = Instant::now();
183 for task in self.task_queue_mut() {
184 if matches!(task.status, TaskStatus::Queued) {
185 let base_priority = task.priority as f32;
186
187 let key = TaskTypeKey::from(&task.kind);
189 let time_factor = if let Some(avg_time) = avg_execution_times.get(&key) {
190 let max_time = Duration::from_secs(60); 1.0 - (avg_time.as_secs_f32() / max_time.as_secs_f32()).min(1.0)
193 } else {
194 0.5 };
196
197 let load_factor = 1.0 - system_load;
199
200 let age = now.duration_since(task.created_at);
202 let age_factor = (age.as_secs_f32() / 60.0).min(1.0); let node_factor = match &task.kind {
206 TaskType::Transform { node_id, .. } => {
207 1.0 - node_loads.get(node_id).copied().unwrap_or(0.5)
208 }
209 TaskType::LearnPattern { node_id, .. } => {
210 1.0 - node_loads.get(node_id).copied().unwrap_or(0.5)
211 }
212 _ => 0.5, };
214
215 let adjusted_priority = base_priority
217 * (0.3 * time_factor
218 + 0.2 * load_factor
219 + 0.2 * age_factor
220 + 0.3 * node_factor);
221
222 task.effective_priority = adjusted_priority.round() as usize;
224 }
225 }
226
227 self.task_queue_mut()
229 .sort_by(|a, b| b.effective_priority.cmp(&a.effective_priority));
230
231 Ok(())
232 }
233 pub fn add_triad(&mut self, triad: Triad) -> crate::Result<rshyper::EdgeId> {
235 self.fragment.add_triad(triad)
236 }
237 pub fn auto_configure(&mut self) -> crate::Result<()> {
239 let optimal_partitions = self.system_info.get_cpu_cores().max(1);
241
242 if self.fragment.partition_count() >= optimal_partitions {
244 return Ok(());
245 }
246
247 self.init_complete_layer()
249 }
250
251 pub fn completed_tasks(&self) -> Vec<&Task> {
252 self.scheduler.completed_tasks()
253 }
254 pub fn coordinate_distributed_learning(&mut self) -> crate::Result<usize>
256 where
257 D: TriadDriver,
258 {
259 let mut nodes_by_class = std::collections::HashMap::new();
261
262 for (&edge_id, node) in self.fragment.partitions() {
263 if node.has_surface_network() {
264 nodes_by_class
265 .entry(node.class())
266 .or_insert_with(Vec::new)
267 .push(edge_id);
268 }
269 }
270
271 for (_class, node_ids) in nodes_by_class {
273 if node_ids.len() <= 1 {
274 continue;
275 }
276
277 let mut all_gradients = Vec::new();
279
280 for &node_id in &node_ids {
281 if let Some(node) = self.fragment.find_node_by_id(node_id) {
282 let gradients = node.extract_learning_gradients()?;
283 all_gradients.push((node_id, gradients));
284 }
285 }
286
287 let mut averaged_gradients = HashMap::new();
289
290 for (_node_id, gradients) in all_gradients {
291 for (pattern_id, gradient) in gradients {
292 averaged_gradients
293 .entry(pattern_id)
294 .or_insert_with(Vec::new)
295 .push(gradient);
296 }
297 }
298
299 for &node_id in &node_ids {
301 if let Some(node) = self.fragment.find_node_by_id_mut(node_id) {
302 node.apply_federated_gradients(&averaged_gradients)?;
303 }
304 }
305
306 self.schedule_task(TaskType::EvaluatePerformance, 8);
308 }
309
310 let task_id = self.schedule_task(TaskType::SharePatterns, 6);
312
313 Ok(task_id)
314 }
315 pub fn coordinate_knowledge_transfer(&mut self) -> crate::Result<()>
317 where
318 D: TriadDriver,
319 {
320 let nodes_with_knowledge: Vec<_> = self
322 .fragment
323 .partitions()
324 .iter()
325 .filter(|(_, node)| node.has_surface_network() && node.feature_count() > 10)
326 .map(|(&id, _)| id)
327 .collect();
328
329 if nodes_with_knowledge.len() <= 1 {
330 return Ok(()); }
332
333 let mut knowledge_patterns = HashMap::new();
335 for &node_id in &nodes_with_knowledge {
336 if let Some(node) = self.fragment.find_node_by_id(node_id) {
337 match node.extract_knowledge_patterns() {
339 Ok(patterns) => {
340 if !patterns.is_empty() {
341 knowledge_patterns.insert(node_id, patterns);
342 }
343 }
344 Err(_e) => {
345 #[cfg(feature = "tracing")]
346 tracing::warn!(
347 "Failed to extract knowledge from node {}: {:?}",
348 node_id,
349 _e
350 );
351 continue;
352 }
353 }
354 }
355 }
356
357 if knowledge_patterns.is_empty() {
358 return Ok(());
359 }
360
361 let consensus_hash = self.compute_knowledge_consensus_hash(&knowledge_patterns);
363
364 let mut transfer_data = Vec::new();
366
367 for &target_id in &nodes_with_knowledge {
368 let mut source_pattern_data = Vec::new();
369
370 for (&source_id, patterns) in &knowledge_patterns {
371 if source_id != target_id {
372 let proximity = if let Some(distance) =
374 self.fragment.voice_leading_distance(target_id, source_id)
375 {
376 1.0 / (1.0 + distance as f32 / 6.0) } else {
378 0.5 };
380
381 if proximity > 0.3 {
383 let pattern_count = (patterns.len() as f32 * proximity).ceil() as usize;
385 if pattern_count > 0 {
386 let selected_patterns = if pattern_count >= patterns.len() {
387 patterns.clone()
388 } else {
389 patterns.iter().take(pattern_count).cloned().collect()
391 };
392
393 source_pattern_data.push((source_id, selected_patterns));
394 }
395 }
396 }
397 }
398
399 if !source_pattern_data.is_empty() {
401 transfer_data.push((target_id, source_pattern_data));
402 }
403 }
404
405 for (target_id, source_patterns) in transfer_data {
407 if let Some(target_node) = self.fragment.find_node_by_id_mut(target_id) {
408 for (source_id, patterns) in source_patterns {
410 if let Err(e) = target_node.integrate_external_knowledge(&patterns, source_id) {
411 #[cfg(feature = "tracing")]
412 tracing::error!("Failed to integrate knowledge: {:?}", e);
413 #[cfg(not(feature = "tracing"))]
414 eprintln!("Failed to integrate knowledge: {:?}", e);
415 }
416 }
417 }
418 }
419
420 self.register_knowledge_transfer(consensus_hash, nodes_with_knowledge.len());
422
423 Ok(())
424 }
425 pub fn coordinate_learning(&mut self) -> crate::Result<()>
427 where
428 D: TriadDriver,
429 {
430 let nodes_with_knowledge: Vec<_> = self
432 .fragment
433 .partitions()
434 .iter()
435 .filter(|(_, node)| node.has_surface_network() && node.feature_count() > 0)
436 .map(|(&id, _)| id)
437 .collect();
438
439 if nodes_with_knowledge.len() <= 1 {
440 return Ok(()); }
442
443 let mut extracted_patterns = Vec::with_capacity(nodes_with_knowledge.len());
445
446 for &node_id in &nodes_with_knowledge {
447 if let Some(node) = self.find_node(node_id) {
448 match node.extract_knowledge_patterns() {
449 Ok(patterns) if !patterns.is_empty() => {
450 extracted_patterns.push((node_id, patterns));
451 }
452 _ => continue,
453 }
454 }
455 }
456
457 let max_targets_per_source = std::cmp::min(5, nodes_with_knowledge.len() - 1);
459
460 for (source_id, patterns) in extracted_patterns {
462 if patterns.is_empty() {
464 continue;
465 }
466
467 let mut target_count = 0;
469
470 for &target_id in &nodes_with_knowledge {
471 if target_id != source_id && target_count < max_targets_per_source {
472 if let Some(target_node) = self.find_node_mut(target_id) {
473 if let Err(_e) =
474 target_node.integrate_external_knowledge(&patterns, source_id)
475 {
476 #[cfg(feature = "tracing")]
477 tracing::error!("Failed to integrate knowledge: {_e:?}");
478 #[cfg(not(feature = "tracing"))]
479 eprintln!("Failed to integrate knowledge: {_e:?}");
480 }
481
482 target_count += 1;
483 }
484 }
485 }
486 }
487
488 for node in self.fragment_mut().partitions_mut().values_mut() {
490 node.memory_mut().record_event(
491 "knowledge_coordination",
492 Some(vec![nodes_with_knowledge.len()]),
493 );
494 }
495
496 Ok(())
497 }
498 pub fn execute_task(&mut self, task_id: usize) -> crate::Result<()>
500 where
501 D: TriadDriver,
502 {
503 let task = self
504 .scheduler
505 .get_task(task_id)
506 .ok_or_else(|| crate::RuntimeError::TaskNotFound(task_id))?;
507 self.orchestrator.execute_task(&mut self.fragment, task)
508 }
509 pub fn execute_tasks(&mut self) -> crate::Result<()>
511 where
512 D: TriadDriver,
513 {
514 let _res = self
516 .scheduler
517 .process_batch(&mut self.fragment, &mut self.orchestrator)?;
518 Ok(())
519 }
520 pub fn find_node(&self, id: rshyper::Index) -> Option<&VNode<D>> {
522 self.fragment().find_node_by_id(id)
523 }
524 pub fn find_node_mut(&mut self, id: rshyper::Index) -> Option<&mut VNode<D>> {
526 self.fragment_mut().find_node_by_id_mut(id)
527 }
528 pub fn init_complete_layer(&mut self) -> crate::Result<()> {
530 self.fragment.initialize_with_all_triads()?;
532
533 self.orchestrator.initialize(&self.fragment);
535
536 Ok(())
537 }
538 pub fn init_node_surfaces(&mut self) -> crate::Result<()>
540 where
541 D: TriadDriver,
542 {
543 let mut initialized_count = 0;
544
545 for (_, node) in self.fragment.partitions_mut() {
547 if node.has_surface_network() {
549 continue;
550 }
551
552 node.init_surface()?;
554
555 initialized_count += 1;
557 }
558
559 if initialized_count > 0 {
560 #[cfg(feature = "tracing")]
561 tracing::debug!("Initialized {} node surface networks", initialized_count);
562 }
563
564 Ok(())
565 }
566 pub fn init_specialized_surfaces(&mut self) -> crate::Result<()>
568 where
569 D: TriadDriver,
570 {
571 use crate::nrt::Triads::*;
572 let mut nodes_by_class = std::collections::HashMap::new();
574
575 for (&edge_id, node) in self.fragment.partitions() {
576 nodes_by_class
577 .entry(node.class())
578 .or_insert_with(Vec::new)
579 .push(edge_id);
580 }
581
582 let class_params: HashMap<_, _> = [
584 (Major, (0.02, 0.6, 0.4)),
586 (Minor, (0.01, 0.8, 0.5)),
588 (Diminished, (0.04, 0.7, 0.6)),
590 (Augmented, (0.05, 0.5, 0.8)),
592 ]
593 .into_iter()
594 .collect();
595
596 for (class, node_ids) in &nodes_by_class {
598 let params = class_params.get(&class).unwrap_or(&(0.01, 0.7, 0.5));
599
600 for &edge_id in node_ids {
601 if let Some(node) = self.fragment.find_node_by_id_mut(edge_id) {
602 if !node.has_surface_network() {
603 let config = crate::actors::surface::SurfaceModelConfig::new(
604 params.0, params.1, params.2,
605 );
606 node.surface_mut().map(|i| i.model_mut().set_config(config));
608 }
609 }
610 }
611 }
612
613 #[cfg(feature = "tracing")]
614 tracing::debug!(
615 "Initialized specialized surface networks for {} classes",
616 nodes_by_class.len()
617 );
618
619 Ok(())
620 }
621 pub fn optimize_memory_allocation(&mut self) -> crate::Result<()>
623 where
624 D: TriadDriver,
625 {
626 let mut node_memory_usage = HashMap::new();
628 let mut total_features = 0;
629
630 for (&edge_id, node) in self.fragment.partitions() {
631 let feature_count = node.feature_count();
632 node_memory_usage.insert(edge_id, feature_count);
633 total_features += feature_count;
634 }
635
636 if self.fragment.partition_count() == 0 {
638 return Ok(());
639 }
640
641 let avg_features = total_features / self.fragment.partition_count();
642 let target_max = (avg_features as f32 * 1.5) as usize; let mut nodes_to_optimize = Vec::new();
646 for (&edge_id, &feature_count) in &node_memory_usage {
647 if feature_count > target_max {
648 nodes_to_optimize.push((edge_id, feature_count));
649 }
650 }
651
652 nodes_to_optimize.sort_by(|a, b| b.1.cmp(&a.1));
654
655 for (edge_id, _) in nodes_to_optimize {
657 if let Some(node) = self.fragment.find_node_by_id_mut(edge_id) {
658 node.consolidate_similar_patterns(0.8)?;
660
661 node.prune_low_importance_features(0.2)?;
663 }
664 }
665
666 Ok(())
667 }
668 pub fn optimize_surface_networks(&mut self) -> crate::Result<()>
670 where
671 D: TriadDriver,
672 {
673 let mut nodes_by_class = std::collections::HashMap::new();
675
676 for (&edge_id, node) in self.fragment.partitions() {
678 nodes_by_class
679 .entry(node.class())
680 .or_insert_with(Vec::new)
681 .push(edge_id);
682 }
683
684 for (_class, node_ids) in nodes_by_class {
686 if node_ids.len() <= 1 {
687 continue;
688 }
689
690 let mut best_node_id = None;
692 let mut most_features = 0;
693
694 for &id in &node_ids {
695 if let Some(node) = self.fragment.find_node_by_id(id) {
696 let features = node.feature_count();
697 if features > most_features && node.has_surface_network() {
698 best_node_id = Some(id);
699 most_features = features;
700 }
701 }
702 }
703
704 if let Some(source_id) = best_node_id {
706 for &target_id in &node_ids {
707 if target_id != source_id {
708 self.orchestrator
710 .send_message(crate::NodeMessage::PatternShare {
711 source: source_id,
712 pattern: vec![], importance: 0.8,
714 });
715 }
716 }
717 }
718 }
719
720 self.process_messages()?;
722
723 Ok(())
724 }
725 pub fn predict_task_resources(&self, task_type: &TaskType) -> (f32, f32) {
727 match TaskTypeKey::from(task_type) {
729 TaskTypeKey::Transform => (5.0, 50.0),
730 TaskTypeKey::LearnPattern => (20.0, 100.0),
731 TaskTypeKey::SharePatterns => (10.0, 80.0),
732 TaskTypeKey::OptimizeNode => (15.0, 30.0),
733 _ => (5.0, 20.0), }
735 }
736 pub fn predictive_schedule(&mut self, look_ahead_ms: u64) -> crate::Result<()>
738 where
739 D: TriadDriver,
740 {
741 let current_cpu = self.system_info.get_cpu_usage();
743 let current_memory = self.system_info.get_ram_memory_usage();
744 let available_memory =
745 self.system_info.get_free_memory_mb() as f32 * (1.0 - current_memory);
746
747 let _cpu_headroom = 1.0 - current_cpu;
750 let _mem_headroom = available_memory;
751
752 let mut cpu_timeline = vec![current_cpu];
754 let mut mem_timeline = vec![current_memory];
755 let timeline_slots = 10; let running_tasks = self.scheduler.get_running_tasks();
759
760 for slot in 1..timeline_slots {
762 let slot_time = Instant::now()
763 + Duration::from_millis(look_ahead_ms * slot as u64 / timeline_slots as u64);
764
765 let mut cpu_usage = current_cpu;
767 let mut mem_usage = current_memory;
768
769 for task in &running_tasks {
771 let task_started = task.started_at.unwrap_or(task.created_at);
772 let (task_cpu, task_mem) = self.predict_task_resources(&task.kind);
773
774 let est_duration = {
776 let task_key = TaskTypeKey::from(&task.kind);
777 self.scheduler.estimate_task_duration(task_key)
778 };
779 let expected_completion = task_started + est_duration;
780
781 if expected_completion > slot_time {
783 cpu_usage += task_cpu / 100.0;
784 mem_usage += task_mem / self.system_info.get_free_memory_mb() as f32;
785 }
786 }
787
788 cpu_usage = cpu_usage.min(1.0).max(0.1);
790 mem_usage = mem_usage.min(1.0).max(0.1);
791
792 cpu_timeline.push(cpu_usage);
793 mem_timeline.push(mem_usage);
794 }
795
796 let mut scheduled_tasks = Vec::new();
798
799 for task in self.task_queue() {
800 if !matches!(task.status, TaskStatus::Queued) {
801 continue;
802 }
803
804 let (task_cpu, task_mem) = self.predict_task_resources(&task.kind);
805 let normalized_cpu = task_cpu / 100.0;
806 let normalized_mem = task_mem / self.system_info.get_free_memory_mb() as f32;
807
808 let mut best_slot = 0;
810 let mut best_fit = f32::MAX;
811
812 for slot in 0..timeline_slots {
813 let slot_cpu = cpu_timeline[slot];
815 let slot_mem = mem_timeline[slot];
816
817 if slot_cpu + normalized_cpu <= 1.0 && slot_mem + normalized_mem <= 1.0 {
818 let fit = normalized_cpu / (1.0 - slot_cpu) + normalized_mem / (1.0 - slot_mem);
820
821 if fit < best_fit {
822 best_fit = fit;
823 best_slot = slot;
824 }
825 }
826 }
827
828 if best_fit < f32::MAX {
830 let delay_ms = look_ahead_ms * best_slot as u64 / timeline_slots as u64;
831
832 scheduled_tasks.push((task.id, delay_ms));
834
835 if best_slot < timeline_slots {
837 for slot in best_slot..timeline_slots {
838 cpu_timeline[slot] += normalized_cpu;
839 mem_timeline[slot] += normalized_mem;
840 }
841 }
842 }
843 }
844
845 for (task_id, delay_ms) in scheduled_tasks {
847 if let Some(task) = self.task_queue_mut().iter_mut().find(|t| t.id == task_id) {
848 task.scheduled_delay = Some(Duration::from_millis(delay_ms));
849 }
850 }
851
852 Ok(())
853 }
854 pub fn prioritize_tonnetz_tasks(&mut self) -> crate::Result<()>
856 where
857 D: TriadDriver,
858 {
859 let mut observer_position = None;
861
862 for (&edge_id, node) in self.fragment.partitions() {
863 if matches!(node.operator(), crate::actors::Operator::Observer(_)) {
864 observer_position = Some(edge_id);
865 break;
866 }
867 }
868
869 let observer_position = observer_position.unwrap_or_else(|| {
871 self.fragment
872 .partitions()
873 .keys()
874 .next()
875 .copied()
876 .unwrap_or_default()
877 });
878
879 let mut task_adjustments = Vec::new();
881
882 for (idx, task) in self.task_queue().iter().enumerate() {
883 if !matches!(task.status, TaskStatus::Queued) {
884 continue;
885 }
886
887 let affected_node = match &task.kind {
889 TaskType::Transform { node_id, .. } => Some(*node_id),
890 TaskType::OptimizeNode { node_id, .. } => Some(*node_id),
891 TaskType::ChangeNodeMode(node_id, _) => Some(*node_id),
892 TaskType::LearnPattern { node_id, .. } => Some(*node_id),
893 TaskType::EvaluateEnsemble { node_id, .. } => Some(*node_id),
894 _ => None, };
896
897 if let Some(node_id) = affected_node {
898 if let Some(distance) = self.voice_leading_distance(observer_position, node_id) {
900 let octave_layer = distance as f32 / 12.0;
902 let distance_factor = (1.0 + octave_layer).recip();
903
904 let base_priority = task.priority as f32;
906 let new_priority = (base_priority * distance_factor).round() as usize;
907
908 task_adjustments.push((idx, new_priority));
910 }
911 }
912 }
913
914 for (idx, priority) in task_adjustments {
916 if let Some(task) = self.task_queue_mut().get_mut(idx) {
917 task.effective_priority = priority;
918 }
919 }
920
921 self.task_queue_mut()
923 .sort_by(|a, b| b.effective_priority.cmp(&a.effective_priority));
924
925 Ok(())
926 }
927 pub fn process_messages(&mut self) -> crate::Result<()>
929 where
930 D: TriadDriver,
931 {
932 self.orchestrator.process_messages(&mut self.fragment)
933 }
934 pub fn run_maintenance(&mut self) -> crate::Result<Vec<usize>>
936 where
937 D: TriadDriver,
938 {
939 let mut task_ids = Vec::new();
940
941 let memory_task_id = self.schedule_task(
943 TaskType::OptimizeMemory { max_features: 5000 },
944 3, );
946 task_ids.push(memory_task_id);
947
948 let knowledge_task_id = self.schedule_knowledge_transfer(6);
950 task_ids.push(knowledge_task_id);
951
952 let balance_task_id = self.schedule_task(
954 TaskType::BalanceResources,
955 4, );
957 task_ids.push(balance_task_id);
958
959 Ok(task_ids)
960 }
961 pub fn runtime_metrics(&self) -> crate::ResourceMetrics {
963 crate::ResourceMetrics::default()
964 .with_cpu_usage(self.system_info.get_cpu_usage())
965 .with_memory_usage(self.system_info.memory_usage())
966 .with_available_memory(self.system_info.get_free_memory_mb())
967 .with_partition_count(self.fragment.partition_count())
968 .with_edge_count(self.fragment.edge_count())
969 .with_vertex_count(self.fragment.vertex_count())
970 .with_queued_tasks(self.task_queue().len())
971 .with_running_tasks(self.orchestrator.active_task_count())
972 .with_learning_nodes(
973 self.fragment
974 .partitions()
975 .values()
976 .filter(|node| node.has_surface_network())
977 .count(),
978 )
979 .with_feature_count(
980 self.fragment
981 .partitions()
982 .values()
983 .map(|node| node.feature_count())
984 .sum::<usize>(),
985 )
986 }
987 pub fn schedule_knowledge_transfer(&mut self, priority: usize) -> usize {
989 self.schedule_task(TaskType::CoordinateLearning, priority)
991 }
992 pub fn schedule_memory_optimization(&mut self) -> crate::Result<usize> {
994 let total_features: usize = self
996 .fragment
997 .partitions()
998 .values()
999 .map(|node| node.feature_count())
1000 .sum();
1001
1002 let node_count = self.fragment.partitions().len().max(1);
1004
1005 let avg_features_per_node = total_features / node_count;
1007
1008 let max_features = if avg_features_per_node > 1000 {
1010 avg_features_per_node * 2 / 3
1012 } else {
1013 avg_features_per_node * 3 / 4
1015 }
1016 .max(500); let priority = if avg_features_per_node > 2000 {
1020 7
1022 } else {
1023 3
1025 };
1026
1027 let task_id = self.schedule_task(TaskType::OptimizeMemory { max_features }, priority);
1029 Ok(task_id)
1030 }
1031 pub fn schedule_task(&mut self, task_type: TaskType, priority: usize) -> usize {
1033 self.scheduler.create_task(task_type, priority)
1035 }
1036 pub fn system_health_report(&self) -> String {
1038 let metrics = self.runtime_metrics();
1039 let info = self.system_info();
1040 let timestamp = eryon::Timestamp::<u64>::now_in_secs();
1041
1042 let mut report = String::new();
1043
1044 report.push_str(&format!("System Health Report - {}\n", timestamp));
1046 report.push_str("==================================\n\n");
1047
1048 report.push_str("SYSTEM RESOURCES\n");
1050 report.push_str("-----------------\n");
1051 report.push_str(&format!("CPU Cores: {}\n", info.get_cpu_cores()));
1052 report.push_str(&format!(
1053 "Memory: {} MB\n",
1054 info.get_free_memory_mb()
1055 ));
1056 report.push_str(&format!(
1057 "CPU Usage: {:.1}%\n",
1058 metrics.cpu_usage() * 100.0
1059 ));
1060 report.push_str(&format!(
1061 "Memory Usage: {:.1}%\n",
1062 metrics.memory_usage() * 100.0
1063 ));
1064 report.push_str(&format!(
1065 "Available Compute: {}%\n\n",
1066 metrics.available_compute()
1067 ));
1068
1069 report.push_str("TONNETZ TOPOLOGY\n");
1071 report.push_str("----------------\n");
1072 report.push_str(&format!(
1073 "Partitions: {}\n",
1074 metrics.partition_count()
1075 ));
1076 report.push_str(&format!("Vertices: {}\n", metrics.vertex_count()));
1077 report.push_str(&format!("Edges: {}\n\n", metrics.edge_count()));
1078
1079 report.push_str("RUNTIME ACTIVITY\n");
1081 report.push_str("----------------\n");
1082 report.push_str(&format!("Queued Tasks: {}\n", metrics.queued_tasks()));
1083 report.push_str(&format!("Running Tasks: {}\n", metrics.running_tasks()));
1084
1085 let mut task_counts = std::collections::HashMap::new();
1087 for task in self.task_queue() {
1088 let key = TaskTypeKey::from(&task.kind);
1089 *task_counts.entry(key).or_insert(0) += 1;
1090 }
1091
1092 if !task_counts.is_empty() {
1094 report.push_str("Task Distribution:\n");
1095 for (task_type, count) in task_counts {
1096 report.push_str(&format!(" - {}: {}\n", task_type, count));
1097 }
1098 }
1099 report.push_str("\n");
1100
1101 report.push_str("LEARNING STATUS\n");
1103 report.push_str("--------------\n");
1104 report.push_str(&format!(
1105 "Learning Nodes: {}\n",
1106 metrics.learning_nodes()
1107 ));
1108 report.push_str(&format!(
1109 "Surface Networks: {}\n",
1110 metrics.surface_network_count()
1111 ));
1112 report.push_str(&format!(
1113 "Features: {}\n\n",
1114 metrics.feature_count()
1115 ));
1116
1117 report.push_str("CAPABILITIES\n");
1119 report.push_str("-----------\n");
1120 for capability in metrics.capabilities() {
1121 report.push_str(&format!("- {}\n", capability));
1122 }
1123 report.push_str("\n");
1124
1125 report.push_str("PERFORMANCE ASSESSMENT\n");
1127 report.push_str("---------------------\n");
1128
1129 let memory_ratio =
1131 (metrics.feature_count() as f32) / (1000.0 * metrics.partition_count().max(1) as f32);
1132 let memory_status = if memory_ratio > 2.0 {
1133 "HIGH - Consider memory optimization"
1134 } else if memory_ratio > 1.0 {
1135 "MODERATE - Monitor memory growth"
1136 } else {
1137 "GOOD - Memory usage is acceptable"
1138 };
1139 report.push_str(&format!("Memory Pressure: {}\n", memory_status));
1140
1141 let cpu_status = if metrics.cpu_usage() > 0.8 {
1143 "HIGH - Consider scaling out"
1144 } else if metrics.cpu_usage() > 0.5 {
1145 "MODERATE - Monitor CPU usage"
1146 } else {
1147 "GOOD - CPU usage is acceptable"
1148 };
1149 report.push_str(&format!("CPU Pressure: {}\n", cpu_status));
1150
1151 let task_ratio = metrics.queued_tasks() as f32 / (info.get_cpu_cores() * 2) as f32;
1153 let task_status = if task_ratio > 3.0 {
1154 "HIGH - Large task backlog"
1155 } else if task_ratio > 1.0 {
1156 "MODERATE - Tasks may experience delays"
1157 } else {
1158 "GOOD - Task processing is timely"
1159 };
1160 report.push_str(&format!("Task Backlog: {}\n", task_status));
1161
1162 report
1163 }
1164 pub fn tune_surface_networks(&mut self) -> crate::Result<()>
1166 where
1167 D: TriadDriver,
1168 {
1169 let mut performance_metrics = HashMap::new();
1171
1172 for (&edge_id, node) in self.fragment.partitions() {
1173 if node.has_surface_network() {
1174 let accuracy = node.get_learning_accuracy();
1176 let convergence_rate = node.get_convergence_rate();
1177 let feature_quality = node.get_feature_quality();
1178
1179 let performance = accuracy * 0.5 + convergence_rate * 0.3 + feature_quality * 0.2;
1181
1182 performance_metrics.insert(edge_id, performance);
1183 }
1184 }
1185
1186 if performance_metrics.is_empty() {
1188 return Ok(());
1189 }
1190
1191 let mut entries: Vec<_> = performance_metrics.iter().collect();
1192 entries.sort_by(|a, b| b.1.partial_cmp(a.1).unwrap_or(std::cmp::Ordering::Equal));
1193
1194 let best_node_id = *entries.first().unwrap().0;
1195
1196 if let Some(best_node) = self.fragment.find_node_by_id(best_node_id) {
1198 let optimal_params = best_node.get_surface_parameters();
1199
1200 for (edge_id, performance) in performance_metrics {
1202 if performance < 0.7 && edge_id != best_node_id {
1204 if let Some(node) = self.fragment.find_node_by_id_mut(edge_id) {
1205 node.adjust_surface_parameters(&optimal_params, 0.3)?;
1207 }
1208 }
1209 }
1210 }
1211
1212 Ok(())
1213 }
1214 pub fn voice_leading_distance(&self, from: EdgeId, to: EdgeId) -> Option<usize> {
1217 self.fragment.voice_leading_distance(from, to)
1218 }
1219}
1220
1221impl<D> RuntimeInner<D>
1222where
1223 D: Driver<Triad>,
1224{
1225 #[doc(hidden)]
1227 pub fn compute_knowledge_consensus_hash(
1228 &self,
1229 patterns: &HashMap<EdgeId, Vec<Vec<usize>>>,
1230 ) -> [u8; 32] {
1231 use std::collections::hash_map::DefaultHasher;
1232 use std::hash::Hasher;
1233
1234 let mut hasher = DefaultHasher::new();
1236
1237 let mut ordered_nodes: Vec<_> = patterns.keys().collect();
1239 ordered_nodes.sort();
1240
1241 for &node_id in &ordered_nodes {
1242 if let Some(node_patterns) = patterns.get(node_id) {
1243 hasher.write_usize(**node_id);
1245 hasher.write_usize(node_patterns.len());
1246
1247 for pattern in node_patterns {
1248 hasher.write_usize(pattern.len());
1249 for &value in pattern {
1250 hasher.write_usize(value);
1251 }
1252 }
1253 }
1254 }
1255
1256 let hash_value = hasher.finish();
1258 let mut result = [0u8; 32];
1259 let bytes = hash_value.to_be_bytes();
1260 result[..bytes.len()].copy_from_slice(&bytes);
1261
1262 result
1263 }
1264 #[allow(dead_code)]
1267 fn register_knowledge_transfer(&mut self, consensus_hash: [u8; 32], node_count: usize) {
1268 let timestamp = eryon::Timestamp::<u64>::now_in_secs();
1270
1271 let transfer_key = format!("knowledge_transfer_{timestamp}");
1273 let transfer_value = format!("{:?}:{}", consensus_hash, node_count);
1274
1275 for node in self.fragment.partitions_mut().values_mut() {
1277 node.memory_mut()
1278 .set_property(&transfer_key, &transfer_value);
1279 }
1280
1281 #[cfg(feature = "tracing")]
1283 tracing::info!(
1284 "Knowledge transfer completed: {} nodes, consensus: {:?}",
1285 node_count,
1286 &consensus_hash[0..4]
1287 );
1288 }
1289}
1290
1291impl<D> Default for RuntimeInner<D>
1292where
1293 D: Driver<Triad>,
1294{
1295 fn default() -> Self {
1296 Self::new()
1297 }
1298}
1299
1300impl<D> eryon::Execute<usize> for RuntimeInner<D>
1301where
1302 D: TriadDriver,
1303{
1304 type Output = crate::Result;
1305 fn execute(mut self, rhs: usize) -> Self::Output {
1306 self.execute_task(rhs)
1307 }
1308}
1309
1310impl<D> eryon::ExecuteMut<usize> for RuntimeInner<D>
1311where
1312 D: TriadDriver,
1313{
1314 type Output = crate::Result;
1315 fn execute_mut(&mut self, rhs: usize) -> Self::Output {
1316 self.execute_task(rhs)
1317 }
1318}