1use crate::NodeMessage;
6use crate::frag::FragTonnetz;
7use crate::tasks::{Task, TaskStatus, TaskType};
8use eryon::ops::Execute;
9use eryon_actors::prelude::{Actor, Driver, Operator, OperatorKind, TriadDriver, VNode};
10use rshyper::EdgeId;
11use rstmt::nrt::Triad;
12use scsys::Timestamp;
13use std::collections::{HashMap, HashSet, VecDeque};
14
15#[derive(Clone, Debug)]
19pub struct Orchestrator {
20 pub(crate) task_statuses: HashMap<usize, TaskStatus>,
22 pub(crate) active_tasks: HashSet<usize>,
24 pub(crate) active_nodes: HashSet<EdgeId>,
26 pub(crate) message_queue: VecDeque<NodeMessage>,
28 pub(crate) next_task_id: usize,
30}
31
32impl Default for Orchestrator {
33 fn default() -> Self {
34 Self::new()
35 }
36}
37
38impl Orchestrator {
39 pub fn new() -> Self {
41 Orchestrator {
42 task_statuses: HashMap::new(),
43 active_tasks: HashSet::new(),
44 active_nodes: HashSet::new(),
45 message_queue: VecDeque::new(),
46 next_task_id: 0,
47 }
48 }
49 pub fn active_node_count(&self) -> usize {
51 self.active_nodes.len()
52 }
53 pub fn active_task_count(&self) -> usize {
55 self.active_tasks.len()
56 }
57 pub fn is_node_active(&self, node_id: EdgeId) -> bool {
59 self.active_nodes.contains(&node_id)
60 }
61}
62
63impl Orchestrator {
64 pub fn active_node_ids(&self) -> Vec<EdgeId> {
66 self.active_nodes.iter().copied().collect()
67 }
68 pub fn coordinate_knowledge_sharing<D>(
70 &mut self,
71 fragment: &mut FragTonnetz<D>,
72 ) -> crate::Result<()>
73 where
74 D: TriadDriver,
75 {
76 let nodes_with_knowledge: Vec<_> = fragment
78 .partitions()
79 .iter()
80 .filter(|(_, node)| node.has_surface_network() && node.total_features() > 0)
81 .map(|(&id, _)| id)
82 .collect();
83
84 if nodes_with_knowledge.len() <= 1 {
85 return Ok(()); }
87
88 let mut extracted_patterns = Vec::with_capacity(nodes_with_knowledge.len());
90
91 for &node_id in &nodes_with_knowledge {
92 if let Some(node) = fragment.get_vnode(&node_id) {
93 match node.extract_knowledge_patterns() {
94 Ok(patterns) if !patterns.is_empty() => {
95 extracted_patterns.push((node_id, patterns));
96 }
97 _ => continue,
98 }
99 }
100 }
101
102 let max_targets_per_source = 5;
105
106 for (source_id, patterns) in extracted_patterns {
108 if patterns.is_empty() {
110 continue;
111 }
112
113 let mut target_count = 0;
115
116 for &target_id in &nodes_with_knowledge {
117 if target_id != source_id && target_count < max_targets_per_source {
118 if let Some(target_node) = fragment.get_vnode_mut(&target_id) {
119 if let Err(_e) =
120 target_node.integrate_external_knowledge(&patterns, &source_id)
121 {
122 #[cfg(feature = "tracing")]
123 tracing::warn!("Failed to integrate knowledge: {:?}", _e);
124 }
125
126 target_count += 1;
127 }
128 }
129 }
130 }
131
132 for node in fragment.partitions_mut().values_mut() {
134 node.store_mut().record_event(
135 "knowledge_coordination",
136 Some(vec![nodes_with_knowledge.len()]),
137 );
138 }
139
140 Ok(())
141 }
142 pub fn create_adaptive_task<D>(&self, fragment: &FragTonnetz<D>) -> Task
144 where
145 D: Driver<Triad>,
146 {
147 let capabilities = fragment.calculate_node_capabilities();
149 let avg_memory = capabilities
150 .values()
151 .map(|cap| cap.memory_usage)
152 .sum::<usize>() as f32
153 / capabilities.len().max(1) as f32;
154
155 if avg_memory > 5000.0 {
157 Task::new(
159 self.next_task_id,
160 8,
161 TaskType::OptimizeMemory { max_features: 1000 },
162 )
163 } else {
164 let learning_nodes = capabilities
166 .values()
167 .filter(|cap| cap.learning_capability)
168 .count();
169
170 if learning_nodes > 1 {
171 Task::new(self.next_task_id, 6, TaskType::CoordinateLearning)
172 } else {
173 Task::new(self.next_task_id, 5, TaskType::ComputeTransformations)
175 }
176 }
177 }
178 pub fn create_coordinate_learning_task(&self, priority: usize) -> Task {
180 Task::new(self.next_task_id, priority, TaskType::CoordinateLearning)
181 }
182 pub fn create_node_mode_change_task(&self, node_id: EdgeId, mode: OperatorKind) -> Task {
184 Task::new(
185 self.next_task_id,
186 7,
187 TaskType::ChangeNodeMode(node_id, mode),
188 )
189 }
190 #[cfg_attr(
192 feature = "tracing",
193 tracing::instrument(skip(self, fragment, task), name = "execute", target = "orchestrator")
194 )]
195 pub fn execute_task<D>(
196 &mut self,
197 fragment: &mut FragTonnetz<D>,
198 task: &Task,
199 ) -> crate::Result<()>
200 where
201 D: TriadDriver,
202 {
203 match &task.kind {
204 TaskType::Transform {
205 node_id,
206 transformation,
207 } => {
208 if let Some(node) = fragment.get_vnode_mut(node_id) {
209 node.transform(*transformation)?;
210 } else {
211 return Err(crate::RuntimeError::NodeNotFound);
212 }
213 }
214 TaskType::BatchTransform(operations) => {
215 for (node_id, transforms) in operations {
216 if let Some(node) = fragment.get_vnode_mut(node_id) {
217 node.transform_batch(transforms)?;
218 } else {
219 return Err(crate::RuntimeError::NodeNotFound);
220 }
221 }
222 }
223 TaskType::ComputeTransformations => {
224 fragment.compute_transformations();
225 }
226 TaskType::SharePatterns => {
227 self.share_patterns(fragment)?;
228 }
229 TaskType::CoordinateLearning => {
230 fragment.coordinate_learning()?;
231 }
232
233 TaskType::OptimizeMemory { max_features } => {
234 let node_stats: Vec<(EdgeId, usize)> = fragment
236 .partitions()
237 .iter()
238 .map(|(id, node)| (*id, node.total_features()))
239 .filter(|(_, count)| *count > max_features / 2) .collect();
241
242 let mut _optimized = 0;
243 let mut _pruned = 0;
244
245 for (node_id, feature_count) in node_stats {
247 if let Some(node) = fragment.get_vnode_mut(&node_id) {
248 let node_max = std::cmp::min(*max_features, feature_count * 2 / 3);
250
251 match node.optimize_memory(node_max) {
252 Ok(stats) => {
253 _optimized += 1;
254 _pruned += stats.pruned_count();
255
256 node.store_mut().record_event(
258 "memory_optimized",
259 Some(vec![stats.features_before(), stats.features_after()]),
260 );
261 }
262 Err(_e) => {
263 #[cfg(feature = "tracing")]
264 tracing::error!(
265 "Failed to optimize memory for node {node_id}: {_e:?}"
266 );
267 }
268 }
269 }
270 }
271 #[cfg(feature = "tracing")]
272 tracing::info!(
273 "Memory optimization: {} nodes optimized, {} features pruned",
274 _optimized,
275 _pruned
276 );
277 let timestamp = Timestamp::<u64>::now().to_string();
279
280 fragment.partitions_mut().values_mut().for_each(|node| {
281 node.store_mut()
282 .set_property("last_memory_optimization", ×tamp);
283 });
284 }
285 TaskType::BalanceResources => {
286 fragment.balance_resources()?;
287 }
288 TaskType::ChangeNodeMode(node_id, mode) => {
289 fragment.set_node_operator(*node_id, *mode)?;
290 }
291 TaskType::ProcessAgentProposals => {
292 self.process_agent_proposals(fragment)?;
293 }
294 _ => todo!("TaskType is not currently supported"),
295 }
296
297 Ok(())
298 }
299 pub fn get_task_status(&self, task_id: usize) -> Option<&TaskStatus> {
301 self.task_statuses.get(&task_id)
302 }
303 pub fn get_parallel_tasks(&self) -> Vec<usize> {
305 self.task_statuses
306 .iter()
307 .filter_map(|(&id, status)| {
308 if *status == TaskStatus::Queued {
309 Some(id)
310 } else {
311 None
312 }
313 })
314 .collect()
315 }
316 pub fn initialize<D>(&mut self, fragment: &FragTonnetz<D>)
318 where
319 D: Driver<Triad>,
320 {
321 for &edge_id in fragment.partitions().keys() {
323 self.active_nodes.insert(edge_id);
324 }
325 }
326 pub fn mark_node_active(&mut self, node_id: EdgeId) {
328 self.active_nodes.insert(node_id);
329 }
330 pub fn mark_node_inactive(&mut self, node_id: EdgeId) {
332 self.active_nodes.remove(&node_id);
333 }
334 pub fn monitor_node_health<D>(&mut self, fragment: &FragTonnetz<D>) -> Vec<EdgeId>
336 where
337 D: Driver<Triad>,
338 {
339 let mut unhealthy_nodes = Vec::new();
340 let capabilities = fragment.calculate_node_capabilities();
341
342 for (&id, cap) in &capabilities {
344 if cap.memory_usage > 10000 {
345 unhealthy_nodes.push(id);
347 }
348 }
349
350 unhealthy_nodes
351 }
352 pub fn optimize_operator_distribution<D>(
354 &mut self,
355 fragment: &mut FragTonnetz<D>,
356 ) -> crate::Result<()>
357 where
358 D: Driver<Triad>,
359 {
360 fragment.optimize_operator_distribution()
361 }
362 pub fn process_agent_proposals<D>(
364 &mut self,
365 fragment: &mut FragTonnetz<D>,
366 ) -> crate::Result<Vec<usize>>
367 where
368 D: TriadDriver,
369 {
370 let proposals = fragment.collect_proposed_transformations();
372 if proposals.is_empty() {
373 return Ok(Vec::new());
374 }
375
376 let mut tasks = Vec::new();
378 for (node_id, transformation) in proposals {
379 let task_id = self.register_task();
380
381 let task = Task::new(
382 task_id,
383 5,
384 TaskType::Transform {
385 node_id,
386 transformation,
387 },
388 );
389
390 tasks.push(task);
391 }
392
393 let mut completed = Vec::new();
395 for task in tasks {
396 let task_id = task.id;
397 self.update_task_status(task_id, TaskStatus::Running);
398
399 match self.execute_task(fragment, &task) {
400 Ok(_) => {
401 self.update_task_status(task_id, TaskStatus::Completed);
402 completed.push(task_id);
403 }
404 Err(e) => {
405 self.update_task_status(task_id, TaskStatus::Failed(format!("{:?}", e)));
406 }
407 }
408 }
409
410 Ok(completed)
411 }
412 pub fn process_messages<D>(&mut self, fragment: &mut FragTonnetz<D>) -> crate::Result<()>
414 where
415 D: TriadDriver,
416 {
417 while let Some(message) = self.message_queue.pop_front() {
418 match &message {
419 NodeMessage::TransformRequest {
420 target, transform, ..
421 } => {
422 if let Some(node) = fragment.get_vnode_mut(target) {
424 node.transform(*transform)?;
425 } else {
426 return Err(crate::RuntimeError::NodeNotFound);
427 }
428 }
429 NodeMessage::StateSync {
430 source,
431 state_hash: _,
432 } => {
433 self.active_nodes.insert(*source);
435 }
436 NodeMessage::PatternShare {
437 source,
438 pattern,
439 importance: _,
440 } => {
441 for (&edge_id, node) in fragment.partitions_mut() {
443 if edge_id != *source {
444 let _ = node.learn_transformation_sequence(pattern);
445 }
446 }
447 }
448 }
449 }
450
451 Ok(())
452 }
453 pub fn register_task(&mut self) -> usize {
455 let task_id = self.next_task_id;
456 self.next_task_id += 1;
457
458 self.task_statuses.insert(task_id, TaskStatus::Queued);
459 task_id
460 }
461 pub fn share_patterns<D>(&mut self, fragment: &mut FragTonnetz<D>) -> crate::Result<()>
463 where
464 D: Driver<Triad>,
465 {
466 let node_ids: Vec<_> = fragment.partitions().keys().cloned().collect();
468
469 if node_ids.len() <= 1 {
470 return Ok(()); }
472
473 for i in 0..node_ids.len() {
475 let source_id = node_ids[i];
476
477 if let Some(source_node) = fragment.get_vnode(&source_id) {
479 if !<Operator<f32> as Actor<D, f32>>::allows_pattern_sharing(source_node.operator())
480 {
481 continue;
482 }
483 } else {
484 continue;
485 }
486
487 for j in 0..node_ids.len() {
488 if i == j {
489 continue;
490 }
491
492 let target_id = node_ids[j];
493
494 if let Some(target_node) = fragment.get_vnode(&target_id) {
496 if !<Operator<f32> as Actor<D, f32>>::allows_pattern_sharing(
497 target_node.operator(),
498 ) {
499 continue;
500 }
501 } else {
502 continue;
503 }
504 if let (Some(source), Some(target)) = (
506 fragment.clone().get_vnode(&source_id),
507 fragment.get_vnode_mut(&target_id),
508 ) {
509 let mut source_copy = VNode::from_driver(source.driver().clone());
511 source_copy.store_mut().merge(source.store());
512
513 source_copy.share_patterns(target)?;
514 }
515 }
516 }
517
518 Ok(())
519 }
520
521 pub fn schedule_tasks<D>(
523 &mut self,
524 fragment: &mut FragTonnetz<D>,
525 tasks: Vec<Task>,
526 ) -> crate::Result<Vec<usize>>
527 where
528 D: TriadDriver,
529 {
530 let mut task_ids = Vec::with_capacity(tasks.len());
531
532 for task in tasks {
533 let task_id = self.register_task();
534 self.update_task_status(task_id, TaskStatus::Running);
535
536 match self.execute_task(fragment, &task) {
537 Ok(_) => {
538 self.update_task_status(task_id, TaskStatus::Completed);
539 }
540 Err(e) => {
541 self.update_task_status(task_id, TaskStatus::Failed(format!("{:?}", e)));
542 }
543 }
544
545 task_ids.push(task_id);
546 }
547
548 Ok(task_ids)
549 }
550 pub fn send_message(&mut self, message: NodeMessage) {
552 self.message_queue.push_back(message);
553 }
554 pub fn update_task_status(&mut self, task_id: usize, status: TaskStatus) {
556 match status {
557 TaskStatus::Running => {
558 self.active_tasks.insert(task_id);
559 }
560 TaskStatus::Completed | TaskStatus::Failed(_) => {
561 self.active_tasks.remove(&task_id);
562 }
563 _ => {}
564 }
565
566 self.task_statuses.insert(task_id, status);
567 }
568}
569
570impl<D> Execute<(Task, &mut FragTonnetz<D>)> for Orchestrator
571where
572 D: TriadDriver,
573{
574 type Output = crate::Result<()>;
575
576 fn execute(mut self, params: (Task, &mut FragTonnetz<D>)) -> Self::Output {
577 let (task, fragment) = params;
578 self.update_task_status(task.id, TaskStatus::Running);
579
580 match self.execute_task(fragment, &task) {
581 Ok(_) => {
582 self.update_task_status(task.id, TaskStatus::Completed);
583 Ok(())
584 }
585 Err(err) => {
586 self.update_task_status(task.id, TaskStatus::Failed(format!("{:?}", err)));
587 Err(err)
588 }
589 }
590 }
591}