eryon_rt/
orcha.rs

1/*
2    Appellation: orchestrator <module>
3    Contrib: @FL03
4*/
5use crate::NodeMessage;
6use crate::frag::FragTonnetz;
7use crate::tasks::{Task, TaskStatus, TaskType};
8use eryon::ops::Execute;
9use eryon_actors::prelude::{Actor, Driver, Operator, OperatorKind, TriadDriver, VNode};
10use rshyper::EdgeId;
11use rstmt::nrt::Triad;
12use scsys::Timestamp;
13use std::collections::{HashMap, HashSet, VecDeque};
14
15/// The orchestrator is responsible for managing communications between partitions,
16/// ensuring synchronized state transitions, and facilitating coordinated operations
17/// across the fragment.
18#[derive(Clone, Debug)]
19pub struct Orchestrator {
20    /// Maps task IDs to their execution status
21    pub(crate) task_statuses: HashMap<usize, TaskStatus>,
22    /// Current active tasks running in the system
23    pub(crate) active_tasks: HashSet<usize>,
24    /// Keeps track of nodes that have recently communicated
25    pub(crate) active_nodes: HashSet<EdgeId>,
26    /// Message queue for inter-node communication
27    pub(crate) message_queue: VecDeque<NodeMessage>,
28    /// Next available task ID
29    pub(crate) next_task_id: usize,
30}
31
32impl Default for Orchestrator {
33    fn default() -> Self {
34        Self::new()
35    }
36}
37
38impl Orchestrator {
39    /// Create a new orchestrator
40    pub fn new() -> Self {
41        Orchestrator {
42            task_statuses: HashMap::new(),
43            active_tasks: HashSet::new(),
44            active_nodes: HashSet::new(),
45            message_queue: VecDeque::new(),
46            next_task_id: 0,
47        }
48    }
49    /// Get the count of active nodes
50    pub fn active_node_count(&self) -> usize {
51        self.active_nodes.len()
52    }
53    /// Get the count of active tasks
54    pub fn active_task_count(&self) -> usize {
55        self.active_tasks.len()
56    }
57    /// Check if a node is active
58    pub fn is_node_active(&self, node_id: EdgeId) -> bool {
59        self.active_nodes.contains(&node_id)
60    }
61}
62
63impl Orchestrator {
64    /// Get all active node IDs
65    pub fn active_node_ids(&self) -> Vec<EdgeId> {
66        self.active_nodes.iter().copied().collect()
67    }
68    /// Coordinate knowledge sharing between nodes
69    pub fn coordinate_knowledge_sharing<D>(
70        &mut self,
71        fragment: &mut FragTonnetz<D>,
72    ) -> crate::Result<()>
73    where
74        D: TriadDriver,
75    {
76        // 1. Identify knowledge-bearing nodes
77        let nodes_with_knowledge: Vec<_> = fragment
78            .partitions()
79            .iter()
80            .filter(|(_, node)| node.has_surface_network() && node.total_features() > 0)
81            .map(|(&id, _)| id)
82            .collect();
83
84        if nodes_with_knowledge.len() <= 1 {
85            return Ok(()); // Nothing to coordinate
86        }
87
88        // 2. Extract patterns once per node - this is our main optimization
89        let mut extracted_patterns = Vec::with_capacity(nodes_with_knowledge.len());
90
91        for &node_id in &nodes_with_knowledge {
92            if let Some(node) = fragment.get_vnode(&node_id) {
93                match node.extract_knowledge_patterns() {
94                    Ok(patterns) if !patterns.is_empty() => {
95                        extracted_patterns.push((node_id, patterns));
96                    }
97                    _ => continue,
98                }
99            }
100        }
101
102        // 3. Simple filtering: Share each source with at most 5 target nodes
103        // This limits overall complexity without complex musical calculations
104        let max_targets_per_source = 5;
105
106        // 4. Perform transfers using the extracted patterns
107        for (source_id, patterns) in extracted_patterns {
108            // Skip empty pattern sets
109            if patterns.is_empty() {
110                continue;
111            }
112
113            // Choose target nodes (up to max_targets_per_source)
114            let mut target_count = 0;
115
116            for &target_id in &nodes_with_knowledge {
117                if target_id != source_id && target_count < max_targets_per_source {
118                    if let Some(target_node) = fragment.get_vnode_mut(&target_id) {
119                        if let Err(_e) =
120                            target_node.integrate_external_knowledge(&patterns, &source_id)
121                        {
122                            #[cfg(feature = "tracing")]
123                            tracing::warn!("Failed to integrate knowledge: {:?}", _e);
124                        }
125
126                        target_count += 1;
127                    }
128                }
129            }
130        }
131
132        // 5. Record coordination event
133        for node in fragment.partitions_mut().values_mut() {
134            node.store_mut().record_event(
135                "knowledge_coordination",
136                Some(vec![nodes_with_knowledge.len()]),
137            );
138        }
139
140        Ok(())
141    }
142    /// Create decision task based on current system state
143    pub fn create_adaptive_task<D>(&self, fragment: &FragTonnetz<D>) -> Task
144    where
145        D: Driver<Triad>,
146    {
147        // Get system status
148        let capabilities = fragment.calculate_node_capabilities();
149        let avg_memory = capabilities
150            .values()
151            .map(|cap| cap.memory_usage)
152            .sum::<usize>() as f32
153            / capabilities.len().max(1) as f32;
154
155        // Decide what task is most needed
156        if avg_memory > 5000.0 {
157            // If memory usage is high
158            Task::new(
159                self.next_task_id,
160                8,
161                TaskType::OptimizeMemory { max_features: 1000 },
162            )
163        } else {
164            // Check if we need to coordinate learning
165            let learning_nodes = capabilities
166                .values()
167                .filter(|cap| cap.learning_capability)
168                .count();
169
170            if learning_nodes > 1 {
171                Task::new(self.next_task_id, 6, TaskType::CoordinateLearning)
172            } else {
173                // Default to computational optimization
174                Task::new(self.next_task_id, 5, TaskType::ComputeTransformations)
175            }
176        }
177    }
178    /// Create a task for coordinating learning
179    pub fn create_coordinate_learning_task(&self, priority: usize) -> Task {
180        Task::new(self.next_task_id, priority, TaskType::CoordinateLearning)
181    }
182    /// Create a task for node mode change
183    pub fn create_node_mode_change_task(&self, node_id: EdgeId, mode: OperatorKind) -> Task {
184        Task::new(
185            self.next_task_id,
186            7,
187            TaskType::ChangeNodeMode(node_id, mode),
188        )
189    }
190    /// Execute a task on the fragment
191    #[cfg_attr(
192        feature = "tracing",
193        tracing::instrument(skip(self, fragment, task), name = "execute", target = "orchestrator")
194    )]
195    pub fn execute_task<D>(
196        &mut self,
197        fragment: &mut FragTonnetz<D>,
198        task: &Task,
199    ) -> crate::Result<()>
200    where
201        D: TriadDriver,
202    {
203        match &task.kind {
204            TaskType::Transform {
205                node_id,
206                transformation,
207            } => {
208                if let Some(node) = fragment.get_vnode_mut(node_id) {
209                    node.transform(*transformation)?;
210                } else {
211                    return Err(crate::RuntimeError::NodeNotFound);
212                }
213            }
214            TaskType::BatchTransform(operations) => {
215                for (node_id, transforms) in operations {
216                    if let Some(node) = fragment.get_vnode_mut(node_id) {
217                        node.transform_batch(transforms)?;
218                    } else {
219                        return Err(crate::RuntimeError::NodeNotFound);
220                    }
221                }
222            }
223            TaskType::ComputeTransformations => {
224                fragment.compute_transformations();
225            }
226            TaskType::SharePatterns => {
227                self.share_patterns(fragment)?;
228            }
229            TaskType::CoordinateLearning => {
230                fragment.coordinate_learning()?;
231            }
232
233            TaskType::OptimizeMemory { max_features } => {
234                // First collect nodes that need optimization
235                let node_stats: Vec<(EdgeId, usize)> = fragment
236                    .partitions()
237                    .iter()
238                    .map(|(id, node)| (*id, node.total_features()))
239                    .filter(|(_, count)| *count > max_features / 2) // Only optimize if significant features
240                    .collect();
241
242                let mut _optimized = 0;
243                let mut _pruned = 0;
244
245                // Perform optimization on selected nodes
246                for (node_id, feature_count) in node_stats {
247                    if let Some(node) = fragment.get_vnode_mut(&node_id) {
248                        // Adjust max features based on node's actual count
249                        let node_max = std::cmp::min(*max_features, feature_count * 2 / 3);
250
251                        match node.optimize_memory(node_max) {
252                            Ok(stats) => {
253                                _optimized += 1;
254                                _pruned += stats.pruned_count();
255
256                                // Record optimization event
257                                node.store_mut().record_event(
258                                    "memory_optimized",
259                                    Some(vec![stats.features_before(), stats.features_after()]),
260                                );
261                            }
262                            Err(_e) => {
263                                #[cfg(feature = "tracing")]
264                                tracing::error!(
265                                    "Failed to optimize memory for node {node_id}: {_e:?}"
266                                );
267                            }
268                        }
269                    }
270                }
271                #[cfg(feature = "tracing")]
272                tracing::info!(
273                    "Memory optimization: {} nodes optimized, {} features pruned",
274                    _optimized,
275                    _pruned
276                );
277                // Record overall memory optimization with timestamp
278                let timestamp = Timestamp::<u64>::now().to_string();
279
280                fragment.partitions_mut().values_mut().for_each(|node| {
281                    node.store_mut()
282                        .set_property("last_memory_optimization", &timestamp);
283                });
284            }
285            TaskType::BalanceResources => {
286                fragment.balance_resources()?;
287            }
288            TaskType::ChangeNodeMode(node_id, mode) => {
289                fragment.set_node_operator(*node_id, *mode)?;
290            }
291            TaskType::ProcessAgentProposals => {
292                self.process_agent_proposals(fragment)?;
293            }
294            _ => todo!("TaskType is not currently supported"),
295        }
296
297        Ok(())
298    }
299    /// Get task status
300    pub fn get_task_status(&self, task_id: usize) -> Option<&TaskStatus> {
301        self.task_statuses.get(&task_id)
302    }
303    /// Get all tasks that can be executed in parallel
304    pub fn get_parallel_tasks(&self) -> Vec<usize> {
305        self.task_statuses
306            .iter()
307            .filter_map(|(&id, status)| {
308                if *status == TaskStatus::Queued {
309                    Some(id)
310                } else {
311                    None
312                }
313            })
314            .collect()
315    }
316    /// Initialize the orchestrator with the fragment
317    pub fn initialize<D>(&mut self, fragment: &FragTonnetz<D>)
318    where
319        D: Driver<Triad>,
320    {
321        // Register all partitions as active nodes
322        for &edge_id in fragment.partitions().keys() {
323            self.active_nodes.insert(edge_id);
324        }
325    }
326    /// Mark a node as active
327    pub fn mark_node_active(&mut self, node_id: EdgeId) {
328        self.active_nodes.insert(node_id);
329    }
330    /// Mark a node as inactive
331    pub fn mark_node_inactive(&mut self, node_id: EdgeId) {
332        self.active_nodes.remove(&node_id);
333    }
334    /// Monitor node health and resource utilization
335    pub fn monitor_node_health<D>(&mut self, fragment: &FragTonnetz<D>) -> Vec<EdgeId>
336    where
337        D: Driver<Triad>,
338    {
339        let mut unhealthy_nodes = Vec::new();
340        let capabilities = fragment.calculate_node_capabilities();
341
342        // Find nodes with excessive resource usage
343        for (&id, cap) in &capabilities {
344            if cap.memory_usage > 10000 {
345                // Arbitrary high memory threshold
346                unhealthy_nodes.push(id);
347            }
348        }
349
350        unhealthy_nodes
351    }
352    /// Optimize operator distribution across the fragment
353    pub fn optimize_operator_distribution<D>(
354        &mut self,
355        fragment: &mut FragTonnetz<D>,
356    ) -> crate::Result<()>
357    where
358        D: Driver<Triad>,
359    {
360        fragment.optimize_operator_distribution()
361    }
362    /// Process proposed transformations from agents
363    pub fn process_agent_proposals<D>(
364        &mut self,
365        fragment: &mut FragTonnetz<D>,
366    ) -> crate::Result<Vec<usize>>
367    where
368        D: TriadDriver,
369    {
370        // Collect proposals from agent nodes
371        let proposals = fragment.collect_proposed_transformations();
372        if proposals.is_empty() {
373            return Ok(Vec::new());
374        }
375
376        // Create tasks from proposals
377        let mut tasks = Vec::new();
378        for (node_id, transformation) in proposals {
379            let task_id = self.register_task();
380
381            let task = Task::new(
382                task_id,
383                5,
384                TaskType::Transform {
385                    node_id,
386                    transformation,
387                },
388            );
389
390            tasks.push(task);
391        }
392
393        // Execute the tasks
394        let mut completed = Vec::new();
395        for task in tasks {
396            let task_id = task.id;
397            self.update_task_status(task_id, TaskStatus::Running);
398
399            match self.execute_task(fragment, &task) {
400                Ok(_) => {
401                    self.update_task_status(task_id, TaskStatus::Completed);
402                    completed.push(task_id);
403                }
404                Err(e) => {
405                    self.update_task_status(task_id, TaskStatus::Failed(format!("{:?}", e)));
406                }
407            }
408        }
409
410        Ok(completed)
411    }
412    /// Process messages in the queue
413    pub fn process_messages<D>(&mut self, fragment: &mut FragTonnetz<D>) -> crate::Result<()>
414    where
415        D: TriadDriver,
416    {
417        while let Some(message) = self.message_queue.pop_front() {
418            match &message {
419                NodeMessage::TransformRequest {
420                    target, transform, ..
421                } => {
422                    // Apply transformation to target node
423                    if let Some(node) = fragment.get_vnode_mut(target) {
424                        node.transform(*transform)?;
425                    } else {
426                        return Err(crate::RuntimeError::NodeNotFound);
427                    }
428                }
429                NodeMessage::StateSync {
430                    source,
431                    state_hash: _,
432                } => {
433                    // For now, just mark the node as active
434                    self.active_nodes.insert(*source);
435                }
436                NodeMessage::PatternShare {
437                    source,
438                    pattern,
439                    importance: _,
440                } => {
441                    // Share with other nodes (except the source)
442                    for (&edge_id, node) in fragment.partitions_mut() {
443                        if edge_id != *source {
444                            let _ = node.learn_transformation_sequence(pattern);
445                        }
446                    }
447                }
448            }
449        }
450
451        Ok(())
452    }
453    /// Register a new task and return its ID
454    pub fn register_task(&mut self) -> usize {
455        let task_id = self.next_task_id;
456        self.next_task_id += 1;
457
458        self.task_statuses.insert(task_id, TaskStatus::Queued);
459        task_id
460    }
461    /// Share patterns between nodes
462    pub fn share_patterns<D>(&mut self, fragment: &mut FragTonnetz<D>) -> crate::Result<()>
463    where
464        D: Driver<Triad>,
465    {
466        // Get all nodes
467        let node_ids: Vec<_> = fragment.partitions().keys().cloned().collect();
468
469        if node_ids.len() <= 1 {
470            return Ok(()); // Nothing to share
471        }
472
473        // Share patterns from each node with others
474        for i in 0..node_ids.len() {
475            let source_id = node_ids[i];
476
477            // Skip if source doesn't allow sharing
478            if let Some(source_node) = fragment.get_vnode(&source_id) {
479                if !<Operator<f32> as Actor<D, f32>>::allows_pattern_sharing(source_node.operator())
480                {
481                    continue;
482                }
483            } else {
484                continue;
485            }
486
487            for j in 0..node_ids.len() {
488                if i == j {
489                    continue;
490                }
491
492                let target_id = node_ids[j];
493
494                // Skip if target doesn't allow sharing
495                if let Some(target_node) = fragment.get_vnode(&target_id) {
496                    if !<Operator<f32> as Actor<D, f32>>::allows_pattern_sharing(
497                        target_node.operator(),
498                    ) {
499                        continue;
500                    }
501                } else {
502                    continue;
503                }
504                // If both allow sharing, do the sharing
505                if let (Some(source), Some(target)) = (
506                    fragment.clone().get_vnode(&source_id),
507                    fragment.get_vnode_mut(&target_id),
508                ) {
509                    // Create temporary copy
510                    let mut source_copy = VNode::from_driver(source.driver().clone());
511                    source_copy.store_mut().merge(source.store());
512
513                    source_copy.share_patterns(target)?;
514                }
515            }
516        }
517
518        Ok(())
519    }
520
521    /// Schedule tasks for execution
522    pub fn schedule_tasks<D>(
523        &mut self,
524        fragment: &mut FragTonnetz<D>,
525        tasks: Vec<Task>,
526    ) -> crate::Result<Vec<usize>>
527    where
528        D: TriadDriver,
529    {
530        let mut task_ids = Vec::with_capacity(tasks.len());
531
532        for task in tasks {
533            let task_id = self.register_task();
534            self.update_task_status(task_id, TaskStatus::Running);
535
536            match self.execute_task(fragment, &task) {
537                Ok(_) => {
538                    self.update_task_status(task_id, TaskStatus::Completed);
539                }
540                Err(e) => {
541                    self.update_task_status(task_id, TaskStatus::Failed(format!("{:?}", e)));
542                }
543            }
544
545            task_ids.push(task_id);
546        }
547
548        Ok(task_ids)
549    }
550    /// Send a message between nodes
551    pub fn send_message(&mut self, message: NodeMessage) {
552        self.message_queue.push_back(message);
553    }
554    /// Update the status of a task
555    pub fn update_task_status(&mut self, task_id: usize, status: TaskStatus) {
556        match status {
557            TaskStatus::Running => {
558                self.active_tasks.insert(task_id);
559            }
560            TaskStatus::Completed | TaskStatus::Failed(_) => {
561                self.active_tasks.remove(&task_id);
562            }
563            _ => {}
564        }
565
566        self.task_statuses.insert(task_id, status);
567    }
568}
569
570impl<D> Execute<(Task, &mut FragTonnetz<D>)> for Orchestrator
571where
572    D: TriadDriver,
573{
574    type Output = crate::Result<()>;
575
576    fn execute(mut self, params: (Task, &mut FragTonnetz<D>)) -> Self::Output {
577        let (task, fragment) = params;
578        self.update_task_status(task.id, TaskStatus::Running);
579
580        match self.execute_task(fragment, &task) {
581            Ok(_) => {
582                self.update_task_status(task.id, TaskStatus::Completed);
583                Ok(())
584            }
585            Err(err) => {
586                self.update_task_status(task.id, TaskStatus::Failed(format!("{:?}", err)));
587                Err(err)
588            }
589        }
590    }
591}