1use crate::runtime::RuntimeInner;
6
7use crate::actors::prelude::{Driver, TriadDriver, VNode};
8use crate::frag::FragTonnetz;
9use crate::orcha::Orchestrator;
10use crate::scheduler::Scheduler;
11use crate::tasks::{Task, TaskStatus, TaskType, TaskTypeKey};
12use crate::types::{PerformanceTracker, SystemInfo};
13use rshyper::EdgeId;
14use rstmt::nrt::Triad;
15use scsys::Timestamp;
16use std::collections::HashMap;
17use std::time::{Duration, Instant};
18
19impl<D> RuntimeInner<D>
20where
21 D: Driver<Triad>,
22{
23 pub fn new() -> Self {
25 let fragment = FragTonnetz::new();
26 let orchestrator = Orchestrator::new();
27 let system_info = SystemInfo::new();
28 let perf_tracker = PerformanceTracker::new();
29
30 Self {
31 fragment,
32 orchestrator,
33 scheduler: Scheduler::new(),
34 perf_tracker,
35 system_info,
36 }
37 }
38 pub fn fragment(&self) -> &FragTonnetz<D> {
40 &self.fragment
41 }
42 pub fn fragment_mut(&mut self) -> &mut FragTonnetz<D> {
44 &mut self.fragment
45 }
46 pub fn orchestrator(&self) -> &Orchestrator {
48 &self.orchestrator
49 }
50 pub fn orchestrator_mut(&mut self) -> &mut Orchestrator {
52 &mut self.orchestrator
53 }
54 pub const fn perf_tracker(&self) -> &PerformanceTracker {
56 &self.perf_tracker
57 }
58 pub fn perf_tracker_mut(&mut self) -> &mut PerformanceTracker {
60 &mut self.perf_tracker
61 }
62 pub fn performance_trend_report(&self) -> String {
64 self.perf_tracker.generate_trend_report()
65 }
66 pub const fn scheduler(&self) -> &Scheduler {
68 &self.scheduler
69 }
70 pub fn scheduler_mut(&mut self) -> &mut Scheduler {
72 &mut self.scheduler
73 }
74 pub const fn system_info(&self) -> &SystemInfo {
76 &self.system_info
77 }
78 pub fn system_info_mut(&mut self) -> &mut SystemInfo {
80 &mut self.system_info
81 }
82 pub fn task_queue(&self) -> &[Task] {
84 &self.scheduler.task_queue
85 }
86 pub fn task_queue_mut(&mut self) -> &mut Vec<Task> {
88 &mut self.scheduler.task_queue
89 }
90 pub fn vnodes(&self) -> &HashMap<EdgeId, VNode<D>> {
92 self.fragment.partitions()
93 }
94 pub fn vnodes_mut(&mut self) -> &mut HashMap<EdgeId, VNode<D>> {
96 self.fragment.partitions_mut()
97 }
98 pub fn set_fragment(&mut self, fragment: FragTonnetz<D>) {
100 self.fragment = fragment;
101 }
102 pub fn set_orchestrator(&mut self, orchestrator: Orchestrator) {
104 self.orchestrator = orchestrator;
105 }
106 pub fn set_scheduler(&mut self, scheduler: Scheduler) {
108 self.scheduler = scheduler;
109 }
110 pub fn with_fragment(self, fragment: FragTonnetz<D>) -> Self {
112 Self { fragment, ..self }
113 }
114 pub fn with_orchestrator(self, orchestrator: Orchestrator) -> Self {
116 Self {
117 orchestrator,
118 ..self
119 }
120 }
121 pub fn with_scheduler(self, scheduler: Scheduler) -> Self {
123 Self { scheduler, ..self }
124 }
125 pub fn update_performance_metrics(&mut self) {
127 let metrics = self.runtime_metrics();
129
130 self.perf_tracker.record_metrics(metrics);
132 }
133}
134
135impl<D> RuntimeInner<D>
136where
137 D: Driver<Triad>,
138{
139 pub fn adaptive_schedule(&mut self) -> crate::Result<()>
141 where
142 D: TriadDriver,
143 {
144 let mut task_type_metrics = HashMap::new();
146
147 for completed_task in self.scheduler.completed_tasks() {
149 let execution_time = completed_task
150 .completed_at
151 .unwrap_or_else(Instant::now)
152 .duration_since(
153 completed_task
154 .started_at
155 .unwrap_or(completed_task.created_at),
156 );
157
158 let key = TaskTypeKey::from(&completed_task.kind);
160
161 let entry = task_type_metrics
162 .entry(key)
163 .or_insert_with(|| (Duration::from_secs(0), 0));
164
165 entry.0 += execution_time;
166 entry.1 += 1;
167 }
168
169 let mut avg_execution_times = HashMap::new();
171 for (key, (total_time, count)) in task_type_metrics {
172 if count > 0 {
173 let avg_time = total_time / count as u32;
174 avg_execution_times.insert(key, avg_time);
175 }
176 }
177
178 let system_load = self.system_info.get_current_load();
180 let node_loads = self.fragment.get_all_edge_loads();
181
182 let now = Instant::now();
184 for task in self.task_queue_mut() {
185 if matches!(task.status, TaskStatus::Queued) {
186 let base_priority = task.priority as f32;
187
188 let key = TaskTypeKey::from(&task.kind);
190 let time_factor = if let Some(avg_time) = avg_execution_times.get(&key) {
191 let max_time = Duration::from_secs(60); 1.0 - (avg_time.as_secs_f32() / max_time.as_secs_f32()).min(1.0)
194 } else {
195 0.5 };
197
198 let load_factor = 1.0 - system_load;
200
201 let age = now.duration_since(task.created_at);
203 let age_factor = (age.as_secs_f32() / 60.0).min(1.0); let node_factor = match &task.kind {
207 TaskType::Transform { node_id, .. } => {
208 1.0 - node_loads.get(node_id).copied().unwrap_or(0.5)
209 }
210 TaskType::LearnPattern { node_id, .. } => {
211 1.0 - node_loads.get(node_id).copied().unwrap_or(0.5)
212 }
213 _ => 0.5, };
215
216 let adjusted_priority = base_priority
218 * (0.3 * time_factor
219 + 0.2 * load_factor
220 + 0.2 * age_factor
221 + 0.3 * node_factor);
222
223 task.effective_priority = adjusted_priority.round() as usize;
225 }
226 }
227
228 self.task_queue_mut()
230 .sort_by(|a, b| b.effective_priority.cmp(&a.effective_priority));
231
232 Ok(())
233 }
234 pub fn add_triad(&mut self, triad: Triad) -> crate::Result<rshyper::EdgeId> {
236 self.fragment.add_triad(triad)
237 }
238 pub fn auto_configure(&mut self) -> crate::Result<()> {
240 let optimal_partitions = self.system_info.get_cpu_cores().max(1);
242
243 if self.fragment.total_partitions() >= optimal_partitions {
245 return Ok(());
246 }
247
248 self.init_complete_layer()
250 }
251
252 pub fn completed_tasks(&self) -> Vec<&Task> {
253 self.scheduler.completed_tasks()
254 }
255 pub fn coordinate_distributed_learning(&mut self) -> crate::Result<usize>
257 where
258 D: TriadDriver,
259 {
260 let mut nodes_by_class = std::collections::HashMap::new();
262
263 for (&edge_id, node) in self.fragment.partitions() {
264 if node.has_surface_network() {
265 nodes_by_class
266 .entry(node.class())
267 .or_insert_with(Vec::new)
268 .push(edge_id);
269 }
270 }
271
272 for (_class, node_ids) in nodes_by_class {
274 if node_ids.len() <= 1 {
275 continue;
276 }
277
278 let mut all_gradients = Vec::new();
280
281 for node_id in &node_ids {
282 if let Some(node) = self.fragment.get_vnode(node_id) {
283 let gradients = node.extract_learning_gradients()?;
284 all_gradients.push((node_id, gradients));
285 }
286 }
287
288 let mut averaged_gradients = HashMap::new();
290
291 for (_node_id, gradients) in all_gradients {
292 for (pattern_id, gradient) in gradients {
293 averaged_gradients
294 .entry(pattern_id)
295 .or_insert_with(Vec::new)
296 .push(gradient);
297 }
298 }
299
300 for node_id in &node_ids {
302 if let Some(node) = self.fragment.get_vnode_mut(node_id) {
303 node.apply_federated_gradients(&averaged_gradients)?;
304 }
305 }
306
307 self.schedule_task(TaskType::EvaluatePerformance, 8);
309 }
310
311 let task_id = self.schedule_task(TaskType::SharePatterns, 6);
313
314 Ok(task_id)
315 }
316 pub fn coordinate_knowledge_transfer(&mut self) -> crate::Result<()>
318 where
319 D: TriadDriver,
320 {
321 let nodes_with_knowledge: Vec<_> = self
323 .fragment
324 .partitions()
325 .iter()
326 .filter(|(_, node)| node.has_surface_network() && node.total_features() > 10)
327 .map(|(&id, _)| id)
328 .collect();
329
330 if nodes_with_knowledge.len() <= 1 {
331 return Ok(()); }
333
334 let mut knowledge_patterns = HashMap::new();
336 for node_id in &nodes_with_knowledge {
337 if let Some(node) = self.fragment.get_vnode(node_id) {
338 match node.extract_knowledge_patterns() {
340 Ok(patterns) => {
341 if !patterns.is_empty() {
342 knowledge_patterns.insert(*node_id, patterns);
343 }
344 }
345 Err(_e) => {
346 #[cfg(feature = "tracing")]
347 tracing::warn!(
348 "Failed to extract knowledge from node {}: {:?}",
349 node_id,
350 _e
351 );
352 continue;
353 }
354 }
355 }
356 }
357
358 if knowledge_patterns.is_empty() {
359 return Ok(());
360 }
361
362 let consensus_hash = self.compute_knowledge_consensus_hash(&knowledge_patterns);
364
365 let mut transfer_data = Vec::new();
367
368 for &target_id in &nodes_with_knowledge {
369 let mut source_pattern_data = Vec::new();
370
371 for (&source_id, patterns) in &knowledge_patterns {
372 if source_id != target_id {
373 let proximity = if let Some(distance) =
375 self.fragment.voice_leading_distance(&target_id, &source_id)
376 {
377 1.0 / (1.0 + distance as f32 / 6.0) } else {
379 0.5 };
381
382 if proximity > 0.3 {
384 let pattern_count = (patterns.len() as f32 * proximity).ceil() as usize;
386 if pattern_count > 0 {
387 let selected_patterns = if pattern_count >= patterns.len() {
388 patterns.clone()
389 } else {
390 patterns.iter().take(pattern_count).cloned().collect()
392 };
393
394 source_pattern_data.push((source_id, selected_patterns));
395 }
396 }
397 }
398 }
399
400 if !source_pattern_data.is_empty() {
402 transfer_data.push((target_id, source_pattern_data));
403 }
404 }
405
406 for (target_id, source_patterns) in transfer_data {
408 if let Some(target_node) = self.fragment.get_vnode_mut(&target_id) {
409 for (source_id, patterns) in source_patterns {
411 if let Err(e) = target_node.integrate_external_knowledge(&patterns, &source_id)
412 {
413 #[cfg(feature = "tracing")]
414 tracing::error!("Failed to integrate knowledge: {:?}", e);
415 #[cfg(not(feature = "tracing"))]
416 eprintln!("Failed to integrate knowledge: {:?}", e);
417 }
418 }
419 }
420 }
421
422 self.register_knowledge_transfer(consensus_hash, nodes_with_knowledge.len());
424
425 Ok(())
426 }
427 pub fn coordinate_learning(&mut self) -> crate::Result<()>
429 where
430 D: TriadDriver,
431 {
432 let nodes_with_knowledge: Vec<_> = self
434 .fragment
435 .partitions()
436 .iter()
437 .filter(|(_, node)| node.has_surface_network() && node.total_features() > 0)
438 .map(|(&id, _)| id)
439 .collect();
440
441 if nodes_with_knowledge.len() <= 1 {
442 return Ok(()); }
444
445 let mut extracted_patterns = Vec::with_capacity(nodes_with_knowledge.len());
447
448 for &node_id in &nodes_with_knowledge {
449 if let Some(node) = self.get_vnode(&node_id) {
450 match node.extract_knowledge_patterns() {
451 Ok(patterns) if !patterns.is_empty() => {
452 extracted_patterns.push((node_id, patterns));
453 }
454 _ => continue,
455 }
456 }
457 }
458
459 let max_targets_per_source = core::cmp::min(5, nodes_with_knowledge.len() - 1);
461
462 for (source_id, patterns) in extracted_patterns {
464 if patterns.is_empty() {
466 continue;
467 }
468
469 let mut target_count = 0;
471
472 for target_id in &nodes_with_knowledge {
473 if target_id != &source_id && target_count < max_targets_per_source {
474 if let Some(target_node) = self.get_vnode_mut(target_id) {
475 if let Err(_e) =
476 target_node.integrate_external_knowledge(&patterns, &source_id)
477 {
478 #[cfg(feature = "tracing")]
479 tracing::error!("Failed to integrate knowledge: {_e:?}");
480 #[cfg(not(feature = "tracing"))]
481 eprintln!("Failed to integrate knowledge: {_e:?}");
482 }
483
484 target_count += 1;
485 }
486 }
487 }
488 }
489
490 for node in self.fragment_mut().partitions_mut().values_mut() {
492 node.store_mut().record_event(
493 "knowledge_coordination",
494 Some(vec![nodes_with_knowledge.len()]),
495 );
496 }
497
498 Ok(())
499 }
500 pub fn execute_task(&mut self, task_id: usize) -> crate::Result<()>
502 where
503 D: TriadDriver,
504 {
505 let task = self
506 .scheduler
507 .get_task(task_id)
508 .ok_or_else(|| crate::RuntimeError::TaskNotFound(task_id))?;
509 self.orchestrator.execute_task(&mut self.fragment, task)
510 }
511 pub fn execute_tasks(&mut self) -> crate::Result<()>
513 where
514 D: TriadDriver,
515 {
516 let _res = self
518 .scheduler
519 .process_batch(&mut self.fragment, &mut self.orchestrator)?;
520 Ok(())
521 }
522 pub fn get_vnode(&self, id: &EdgeId) -> Option<&VNode<D>> {
524 self.fragment().get_vnode(id)
525 }
526 pub fn get_vnode_mut(&mut self, id: &EdgeId) -> Option<&mut VNode<D>> {
528 self.fragment_mut().get_vnode_mut(id)
529 }
530 pub fn init_complete_layer(&mut self) -> crate::Result<()> {
532 self.fragment.initialize_with_all_triads()?;
534
535 self.orchestrator.initialize(&self.fragment);
537
538 Ok(())
539 }
540 pub fn init_node_surfaces(&mut self) -> crate::Result<()>
542 where
543 D: TriadDriver,
544 {
545 let mut initialized_count = 0;
546
547 for node in self.fragment.partitions_mut().values_mut() {
549 if node.has_surface_network() {
551 continue;
552 }
553
554 node.init_surface()?;
556
557 initialized_count += 1;
559 }
560
561 if initialized_count > 0 {
562 #[cfg(feature = "tracing")]
563 tracing::debug!("Initialized {} node surface networks", initialized_count);
564 }
565
566 Ok(())
567 }
568 pub fn init_specialized_surfaces(&mut self) -> crate::Result<()>
570 where
571 D: TriadDriver,
572 {
573 use crate::nrt::Triads::*;
574 let mut nodes_by_class = std::collections::HashMap::new();
576
577 for (&edge_id, node) in self.fragment.partitions() {
578 nodes_by_class
579 .entry(node.class())
580 .or_insert_with(Vec::new)
581 .push(edge_id);
582 }
583
584 let class_params: HashMap<_, _> = [
586 (Major, (0.02, 0.6, 0.4)),
588 (Minor, (0.01, 0.8, 0.5)),
590 (Diminished, (0.04, 0.7, 0.6)),
592 (Augmented, (0.05, 0.5, 0.8)),
594 ]
595 .into_iter()
596 .collect();
597
598 for (class, node_ids) in &nodes_by_class {
600 let params = class_params.get(class).unwrap_or(&(0.01, 0.7, 0.5));
601
602 for edge_id in node_ids {
603 if let Some(node) = self.fragment.get_vnode_mut(edge_id) {
604 if !node.has_surface_network() {
605 let config = crate::actors::surface::SurfaceModelConfig::new(
606 params.0, params.1, params.2,
607 );
608 node.surface_mut().map(|i| i.model_mut().set_config(config));
610 }
611 }
612 }
613 }
614
615 #[cfg(feature = "tracing")]
616 tracing::debug!(
617 "Initialized specialized surface networks for {} classes",
618 nodes_by_class.len()
619 );
620
621 Ok(())
622 }
623 pub fn optimize_memory_allocation(&mut self) -> crate::Result<()>
625 where
626 D: TriadDriver,
627 {
628 let mut node_memory_usage = HashMap::new();
630 let mut total_features = 0;
631
632 for (&edge_id, node) in self.fragment.partitions() {
633 let feature_count = node.total_features();
634 node_memory_usage.insert(edge_id, feature_count);
635 total_features += feature_count;
636 }
637
638 if self.fragment.total_partitions() == 0 {
640 return Ok(());
641 }
642
643 let avg_features = total_features / self.fragment.total_partitions();
644 let target_max = (avg_features as f32 * 1.5) as usize; let mut nodes_to_optimize = Vec::new();
648 for (&edge_id, &feature_count) in &node_memory_usage {
649 if feature_count > target_max {
650 nodes_to_optimize.push((edge_id, feature_count));
651 }
652 }
653
654 nodes_to_optimize.sort_by(|a, b| b.1.cmp(&a.1));
656
657 for (edge_id, _) in nodes_to_optimize {
659 if let Some(node) = self.fragment.get_vnode_mut(&edge_id) {
660 node.consolidate_similar_patterns(0.8)?;
662
663 node.prune_low_importance_features(0.2)?;
665 }
666 }
667
668 Ok(())
669 }
670 pub fn optimize_surface_networks(&mut self) -> crate::Result<()>
672 where
673 D: TriadDriver,
674 {
675 let mut nodes_by_class = std::collections::HashMap::new();
677
678 for (&edge_id, node) in self.fragment.partitions() {
680 nodes_by_class
681 .entry(node.class())
682 .or_insert_with(Vec::new)
683 .push(edge_id);
684 }
685
686 for (_class, node_ids) in nodes_by_class {
688 if node_ids.len() <= 1 {
689 continue;
690 }
691
692 let mut best_node_id = None;
694 let mut most_features = 0;
695
696 for id in &node_ids {
697 if let Some(node) = self.fragment.get_vnode(id) {
698 let features = node.total_features();
699 if features > most_features && node.has_surface_network() {
700 best_node_id = Some(*id);
701 most_features = features;
702 }
703 }
704 }
705
706 if let Some(source_id) = best_node_id {
708 for &target_id in &node_ids {
709 if target_id != source_id {
710 self.orchestrator
712 .send_message(crate::NodeMessage::PatternShare {
713 source: source_id,
714 pattern: vec![], importance: 0.8,
716 });
717 }
718 }
719 }
720 }
721
722 self.process_messages()?;
724
725 Ok(())
726 }
727 pub fn predict_task_resources(&self, task_type: &TaskType) -> (f32, f32) {
729 match TaskTypeKey::from(task_type) {
731 TaskTypeKey::Transform => (5.0, 50.0),
732 TaskTypeKey::LearnPattern => (20.0, 100.0),
733 TaskTypeKey::SharePatterns => (10.0, 80.0),
734 TaskTypeKey::OptimizeNode => (15.0, 30.0),
735 _ => (5.0, 20.0), }
737 }
738 pub fn predictive_schedule(&mut self, look_ahead_ms: u64) -> crate::Result<()>
740 where
741 D: TriadDriver,
742 {
743 let current_cpu = self.system_info.get_cpu_usage();
745 let current_memory = self.system_info.get_ram_memory_usage();
746 let available_memory =
747 self.system_info.get_free_memory_mb() as f32 * (1.0 - current_memory);
748
749 let _cpu_headroom = 1.0 - current_cpu;
752 let _mem_headroom = available_memory;
753
754 let mut cpu_timeline = vec![current_cpu];
756 let mut mem_timeline = vec![current_memory];
757 let timeline_slots = 10; let running_tasks = self.scheduler.get_running_tasks();
761
762 for slot in 1..timeline_slots {
764 let slot_time = Instant::now()
765 + Duration::from_millis(look_ahead_ms * slot as u64 / timeline_slots as u64);
766
767 let mut cpu_usage = current_cpu;
769 let mut mem_usage = current_memory;
770
771 for task in &running_tasks {
773 let task_started = task.started_at.unwrap_or(task.created_at);
774 let (task_cpu, task_mem) = self.predict_task_resources(&task.kind);
775
776 let est_duration = {
778 let task_key = TaskTypeKey::from(&task.kind);
779 self.scheduler.estimate_task_duration(task_key)
780 };
781 let expected_completion = task_started + est_duration;
782
783 if expected_completion > slot_time {
785 cpu_usage += task_cpu / 100.0;
786 mem_usage += task_mem / self.system_info.get_free_memory_mb() as f32;
787 }
788 }
789
790 cpu_usage = cpu_usage.min(1.0).max(0.1);
792 mem_usage = mem_usage.min(1.0).max(0.1);
793
794 cpu_timeline.push(cpu_usage);
795 mem_timeline.push(mem_usage);
796 }
797
798 let mut scheduled_tasks = Vec::new();
800
801 for task in self.task_queue() {
802 if !matches!(task.status, TaskStatus::Queued) {
803 continue;
804 }
805
806 let (task_cpu, task_mem) = self.predict_task_resources(&task.kind);
807 let normalized_cpu = task_cpu / 100.0;
808 let normalized_mem = task_mem / self.system_info.get_free_memory_mb() as f32;
809
810 let mut best_slot = 0;
812 let mut best_fit = f32::MAX;
813
814 for slot in 0..timeline_slots {
815 let slot_cpu = cpu_timeline[slot];
817 let slot_mem = mem_timeline[slot];
818
819 if slot_cpu + normalized_cpu <= 1.0 && slot_mem + normalized_mem <= 1.0 {
820 let fit = normalized_cpu / (1.0 - slot_cpu) + normalized_mem / (1.0 - slot_mem);
822
823 if fit < best_fit {
824 best_fit = fit;
825 best_slot = slot;
826 }
827 }
828 }
829
830 if best_fit < f32::MAX {
832 let delay_ms = look_ahead_ms * best_slot as u64 / timeline_slots as u64;
833
834 scheduled_tasks.push((task.id, delay_ms));
836
837 if best_slot < timeline_slots {
839 for slot in best_slot..timeline_slots {
840 cpu_timeline[slot] += normalized_cpu;
841 mem_timeline[slot] += normalized_mem;
842 }
843 }
844 }
845 }
846
847 for (task_id, delay_ms) in scheduled_tasks {
849 if let Some(task) = self.task_queue_mut().iter_mut().find(|t| t.id == task_id) {
850 task.scheduled_delay = Some(Duration::from_millis(delay_ms));
851 }
852 }
853
854 Ok(())
855 }
856 pub fn prioritize_tonnetz_tasks(&mut self) -> crate::Result<()>
858 where
859 D: TriadDriver,
860 {
861 let mut observer_position = None;
863
864 for (&edge_id, node) in self.fragment.partitions() {
865 if matches!(node.operator(), crate::actors::Operator::Observer(_)) {
866 observer_position = Some(edge_id);
867 break;
868 }
869 }
870
871 let observer_position = observer_position.unwrap_or_else(|| {
873 self.fragment
874 .partitions()
875 .keys()
876 .next()
877 .copied()
878 .unwrap_or_default()
879 });
880
881 let mut task_adjustments = Vec::new();
883
884 for (idx, task) in self.task_queue().iter().enumerate() {
885 if !matches!(task.status, TaskStatus::Queued) {
886 continue;
887 }
888
889 let affected_node = match &task.kind {
891 TaskType::Transform { node_id, .. } => Some(*node_id),
892 TaskType::OptimizeNode { node_id, .. } => Some(*node_id),
893 TaskType::ChangeNodeMode(node_id, _) => Some(*node_id),
894 TaskType::LearnPattern { node_id, .. } => Some(*node_id),
895 TaskType::EvaluateEnsemble { node_id, .. } => Some(*node_id),
896 _ => None, };
898
899 if let Some(node_id) = affected_node {
900 if let Some(distance) = self.voice_leading_distance(&observer_position, &node_id) {
902 let octave_layer = distance as f32 / 12.0;
904 let distance_factor = (1.0 + octave_layer).recip();
905
906 let base_priority = task.priority as f32;
908 let new_priority = (base_priority * distance_factor).round() as usize;
909
910 task_adjustments.push((idx, new_priority));
912 }
913 }
914 }
915
916 for (idx, priority) in task_adjustments {
918 if let Some(task) = self.task_queue_mut().get_mut(idx) {
919 task.effective_priority = priority;
920 }
921 }
922
923 self.task_queue_mut()
925 .sort_by(|a, b| b.effective_priority.cmp(&a.effective_priority));
926
927 Ok(())
928 }
929 pub fn process_messages(&mut self) -> crate::Result<()>
931 where
932 D: TriadDriver,
933 {
934 self.orchestrator.process_messages(&mut self.fragment)
935 }
936 pub fn run_maintenance(&mut self) -> crate::Result<Vec<usize>>
938 where
939 D: TriadDriver,
940 {
941 let mut task_ids = Vec::new();
942
943 let memory_task_id = self.schedule_task(
945 TaskType::OptimizeMemory { max_features: 5000 },
946 3, );
948 task_ids.push(memory_task_id);
949
950 let knowledge_task_id = self.schedule_knowledge_transfer(6);
952 task_ids.push(knowledge_task_id);
953
954 let balance_task_id = self.schedule_task(
956 TaskType::BalanceResources,
957 4, );
959 task_ids.push(balance_task_id);
960
961 Ok(task_ids)
962 }
963 pub fn runtime_metrics(&self) -> crate::ResourceMetrics {
965 crate::ResourceMetrics::default()
966 .with_cpu_usage(self.system_info.get_cpu_usage())
967 .with_memory_usage(self.system_info.memory_usage())
968 .with_available_memory(self.system_info.get_free_memory_mb())
969 .with_partition_count(self.fragment.total_partitions())
970 .with_edge_count(self.fragment.total_edges())
971 .with_vertex_count(self.fragment.total_nodes())
972 .with_queued_tasks(self.task_queue().len())
973 .with_running_tasks(self.orchestrator.active_task_count())
974 .with_learning_nodes(
975 self.fragment
976 .partitions()
977 .values()
978 .filter(|node| node.has_surface_network())
979 .count(),
980 )
981 .with_feature_count(
982 self.fragment
983 .partitions()
984 .values()
985 .map(|node| node.total_features())
986 .sum::<usize>(),
987 )
988 }
989 pub fn schedule_knowledge_transfer(&mut self, priority: usize) -> usize {
991 self.schedule_task(TaskType::CoordinateLearning, priority)
993 }
994 pub fn schedule_memory_optimization(&mut self) -> crate::Result<usize> {
996 let total_features: usize = self
998 .fragment
999 .partitions()
1000 .values()
1001 .map(|node| node.total_features())
1002 .sum();
1003
1004 let node_count = self.fragment.partitions().len().max(1);
1006
1007 let avg_features_per_node = total_features / node_count;
1009
1010 let max_features = if avg_features_per_node > 1000 {
1012 avg_features_per_node * 2 / 3
1014 } else {
1015 avg_features_per_node * 3 / 4
1017 }
1018 .max(500); let priority = if avg_features_per_node > 2000 {
1022 7
1024 } else {
1025 3
1027 };
1028
1029 let task_id = self.schedule_task(TaskType::OptimizeMemory { max_features }, priority);
1031 Ok(task_id)
1032 }
1033 pub fn schedule_task(&mut self, task_type: TaskType, priority: usize) -> usize {
1035 self.scheduler.create_task(task_type, priority)
1037 }
1038 pub fn system_health_report(&self) -> String {
1040 let metrics = self.runtime_metrics();
1041 let info = self.system_info();
1042 let timestamp = Timestamp::<u64>::now();
1043
1044 let mut report = String::new();
1045
1046 report.push_str(&format!("System Health Report - {}\n", timestamp));
1048 report.push_str("==================================\n\n");
1049
1050 report.push_str("SYSTEM RESOURCES\n");
1052 report.push_str("-----------------\n");
1053 report.push_str(&format!("CPU Cores: {}\n", info.get_cpu_cores()));
1054 report.push_str(&format!(
1055 "Memory: {} MB\n",
1056 info.get_free_memory_mb()
1057 ));
1058 report.push_str(&format!(
1059 "CPU Usage: {:.1}%\n",
1060 metrics.cpu_usage() * 100.0
1061 ));
1062 report.push_str(&format!(
1063 "Memory Usage: {:.1}%\n",
1064 metrics.memory_usage() * 100.0
1065 ));
1066 report.push_str(&format!(
1067 "Available Compute: {}%\n\n",
1068 metrics.available_compute()
1069 ));
1070
1071 report.push_str("TONNETZ TOPOLOGY\n");
1073 report.push_str("----------------\n");
1074 report.push_str(&format!(
1075 "Partitions: {}\n",
1076 metrics.partition_count()
1077 ));
1078 report.push_str(&format!("Vertices: {}\n", metrics.vertex_count()));
1079 report.push_str(&format!("Edges: {}\n\n", metrics.edge_count()));
1080
1081 report.push_str("RUNTIME ACTIVITY\n");
1083 report.push_str("----------------\n");
1084 report.push_str(&format!("Queued Tasks: {}\n", metrics.queued_tasks()));
1085 report.push_str(&format!("Running Tasks: {}\n", metrics.running_tasks()));
1086
1087 let mut task_counts = std::collections::HashMap::new();
1089 for task in self.task_queue() {
1090 let key = TaskTypeKey::from(&task.kind);
1091 *task_counts.entry(key).or_insert(0) += 1;
1092 }
1093
1094 if !task_counts.is_empty() {
1096 report.push_str("Task Distribution:\n");
1097 for (task_type, count) in task_counts {
1098 report.push_str(&format!(" - {}: {}\n", task_type, count));
1099 }
1100 }
1101 report.push('\n');
1102
1103 report.push_str("LEARNING STATUS\n");
1105 report.push_str("--------------\n");
1106 report.push_str(&format!(
1107 "Learning Nodes: {}\n",
1108 metrics.learning_nodes()
1109 ));
1110 report.push_str(&format!(
1111 "Surface Networks: {}\n",
1112 metrics.surface_network_count()
1113 ));
1114 report.push_str(&format!(
1115 "Features: {}\n\n",
1116 metrics.feature_count()
1117 ));
1118
1119 report.push_str("CAPABILITIES\n");
1121 report.push_str("-----------\n");
1122 for capability in metrics.capabilities() {
1123 report.push_str(&format!("- {}\n", capability));
1124 }
1125 report.push('\n');
1126
1127 report.push_str("PERFORMANCE ASSESSMENT\n");
1129 report.push_str("---------------------\n");
1130
1131 let memory_ratio =
1133 (metrics.feature_count() as f32) / (1000.0 * metrics.partition_count().max(1) as f32);
1134 let memory_status = if memory_ratio > 2.0 {
1135 "HIGH - Consider memory optimization"
1136 } else if memory_ratio > 1.0 {
1137 "MODERATE - Monitor memory growth"
1138 } else {
1139 "GOOD - Memory usage is acceptable"
1140 };
1141 report.push_str(&format!("Memory Pressure: {}\n", memory_status));
1142
1143 let cpu_status = if metrics.cpu_usage() > &0.8 {
1145 "HIGH - Consider scaling out"
1146 } else if metrics.cpu_usage() > &0.5 {
1147 "MODERATE - Monitor CPU usage"
1148 } else {
1149 "GOOD - CPU usage is acceptable"
1150 };
1151 report.push_str(&format!("CPU Pressure: {}\n", cpu_status));
1152
1153 let task_ratio = metrics.queued_tasks() as f32 / (info.get_cpu_cores() * 2) as f32;
1155 let task_status = if task_ratio > 3.0 {
1156 "HIGH - Large task backlog"
1157 } else if task_ratio > 1.0 {
1158 "MODERATE - Tasks may experience delays"
1159 } else {
1160 "GOOD - Task processing is timely"
1161 };
1162 report.push_str(&format!("Task Backlog: {}\n", task_status));
1163
1164 report
1165 }
1166 pub fn tune_surface_networks(&mut self) -> crate::Result<()>
1168 where
1169 D: TriadDriver,
1170 {
1171 let mut performance_metrics = HashMap::new();
1173
1174 for (&edge_id, node) in self.fragment.partitions() {
1175 if node.has_surface_network() {
1176 let accuracy = node.get_learning_accuracy();
1178 let convergence_rate = node.get_convergence_rate();
1179 let feature_quality = node.get_feature_quality();
1180
1181 let performance = accuracy * 0.5 + convergence_rate * 0.3 + feature_quality * 0.2;
1183
1184 performance_metrics.insert(edge_id, performance);
1185 }
1186 }
1187
1188 if performance_metrics.is_empty() {
1190 return Ok(());
1191 }
1192
1193 let mut entries: Vec<_> = performance_metrics.iter().collect();
1194 entries.sort_by(|a, b| b.1.partial_cmp(a.1).unwrap_or(std::cmp::Ordering::Equal));
1195
1196 let best_node_id = *entries.first().unwrap().0;
1197
1198 if let Some(best_node) = self.fragment().get_vnode(&best_node_id) {
1200 let optimal_params = best_node.get_surface_parameters();
1201
1202 for (edge_id, performance) in performance_metrics {
1204 if performance < 0.7 && edge_id != best_node_id {
1206 if let Some(node) = self.fragment_mut().get_vnode_mut(&edge_id) {
1207 node.adjust_surface_parameters(&optimal_params, 0.3)?;
1209 }
1210 }
1211 }
1212 }
1213
1214 Ok(())
1215 }
1216 pub fn voice_leading_distance(&self, from: &EdgeId, to: &EdgeId) -> Option<usize> {
1219 self.fragment().voice_leading_distance(from, to)
1220 }
1221}
1222
1223impl<D> RuntimeInner<D>
1224where
1225 D: Driver<Triad>,
1226{
1227 #[doc(hidden)]
1229 pub fn compute_knowledge_consensus_hash(
1230 &self,
1231 patterns: &HashMap<EdgeId, Vec<Vec<usize>>>,
1232 ) -> [u8; 32] {
1233 use std::collections::hash_map::DefaultHasher;
1234 use std::hash::Hasher;
1235
1236 let mut hasher = DefaultHasher::new();
1238
1239 let mut ordered_nodes: Vec<_> = patterns.keys().collect();
1241 ordered_nodes.sort();
1242
1243 for &node_id in &ordered_nodes {
1244 if let Some(node_patterns) = patterns.get(node_id) {
1245 hasher.write_usize(**node_id);
1247 hasher.write_usize(node_patterns.len());
1248
1249 for pattern in node_patterns {
1250 hasher.write_usize(pattern.len());
1251 for &value in pattern {
1252 hasher.write_usize(value);
1253 }
1254 }
1255 }
1256 }
1257
1258 let hash_value = hasher.finish();
1260 let mut result = [0u8; 32];
1261 let bytes = hash_value.to_be_bytes();
1262 result[..bytes.len()].copy_from_slice(&bytes);
1263
1264 result
1265 }
1266 #[allow(dead_code)]
1269 fn register_knowledge_transfer(&mut self, consensus_hash: [u8; 32], node_count: usize) {
1270 let timestamp = Timestamp::<u64>::now();
1272
1273 let transfer_key = format!("knowledge_transfer_{timestamp}");
1275 let transfer_value = format!("{:?}:{}", consensus_hash, node_count);
1276
1277 for node in self.fragment.partitions_mut().values_mut() {
1279 node.store_mut()
1280 .set_property(&transfer_key, &transfer_value);
1281 }
1282
1283 #[cfg(feature = "tracing")]
1285 tracing::info!(
1286 "Knowledge transfer completed: {} nodes, consensus: {:?}",
1287 node_count,
1288 &consensus_hash[0..4]
1289 );
1290 }
1291}
1292
1293impl<D> Default for RuntimeInner<D>
1294where
1295 D: Driver<Triad>,
1296{
1297 fn default() -> Self {
1298 Self::new()
1299 }
1300}
1301
1302impl<D> eryon::Execute<usize> for RuntimeInner<D>
1303where
1304 D: TriadDriver,
1305{
1306 type Output = crate::Result;
1307 fn execute(mut self, rhs: usize) -> Self::Output {
1308 self.execute_task(rhs)
1309 }
1310}
1311
1312impl<D> eryon::ExecuteMut<usize> for RuntimeInner<D>
1313where
1314 D: TriadDriver,
1315{
1316 type Output = crate::Result;
1317 fn execute_mut(&mut self, rhs: usize) -> Self::Output {
1318 self.execute_task(rhs)
1319 }
1320}