1use std::collections::{HashMap, HashSet};
8
9use crate::adapter::net::behavior::capability::CapabilityFilter;
10use crate::adapter::net::behavior::loadbalance::{
11 Endpoint, HealthStatus, LoadBalancer, RequestContext, Strategy,
12};
13use crate::adapter::net::behavior::metadata::NodeId;
14use crate::adapter::net::behavior::placement::{Artifact, PlacementFilter, TieBreakContext};
15use crate::adapter::net::compute::daemon::DaemonError;
16use crate::adapter::net::compute::scheduler::{
17 PlacementDecision, PlacementReason, Scheduler, SchedulerError,
18};
19
20#[derive(Debug, Clone)]
24pub struct MemberInfo {
25 pub index: u8,
27 pub origin_hash: u64,
29 pub node_id: u64,
31 pub entity_id_bytes: NodeId,
33 pub healthy: bool,
35}
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum GroupHealth {
42 Healthy,
44 Degraded {
46 healthy: u8,
48 total: u8,
50 },
51 Dead,
53}
54
55#[derive(Debug, Clone, PartialEq, Eq)]
59pub enum GroupError {
60 NoHealthyMember,
62 PlacementFailed(String),
64 RegistryFailed(String),
66 InvalidConfig(String),
68}
69
70impl std::fmt::Display for GroupError {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 match self {
73 Self::NoHealthyMember => write!(f, "no healthy member available"),
74 Self::PlacementFailed(msg) => write!(f, "placement failed: {}", msg),
75 Self::RegistryFailed(msg) => write!(f, "registry operation failed: {}", msg),
76 Self::InvalidConfig(msg) => write!(f, "invalid config: {}", msg),
77 }
78 }
79}
80
81impl std::error::Error for GroupError {}
82
83impl From<SchedulerError> for GroupError {
84 fn from(e: SchedulerError) -> Self {
85 Self::PlacementFailed(e.to_string())
86 }
87}
88
89impl From<DaemonError> for GroupError {
90 fn from(e: DaemonError) -> Self {
91 Self::RegistryFailed(e.to_string())
92 }
93}
94
95pub struct GroupCoordinator {
103 pub members: Vec<MemberInfo>,
105 origin_hash_by_entity_id: HashMap<NodeId, u64>,
118 pub lb: LoadBalancer,
120}
121
122impl GroupCoordinator {
123 pub fn new(strategy: Strategy) -> Self {
125 Self {
126 members: Vec::new(),
127 origin_hash_by_entity_id: HashMap::new(),
128 lb: LoadBalancer::with_strategy(strategy),
129 }
130 }
131
132 pub fn add_member(&mut self, info: MemberInfo) {
134 self.lb.add_endpoint(Endpoint::new(info.entity_id_bytes));
135 self.origin_hash_by_entity_id
136 .insert(info.entity_id_bytes, info.origin_hash);
137 self.members.push(info);
138 }
139
140 pub fn remove_last(&mut self) -> Option<MemberInfo> {
145 let info = self.members.pop()?;
146 self.lb.remove_endpoint(&info.entity_id_bytes);
147 self.origin_hash_by_entity_id.remove(&info.entity_id_bytes);
148 Some(info)
149 }
150
151 pub fn route_event(&self, ctx: &RequestContext) -> Result<u64, GroupError> {
155 let selection = self
156 .lb
157 .select(ctx)
158 .map_err(|_| GroupError::NoHealthyMember)?;
159
160 self.origin_hash_for_entity_id(&selection.node_id)
161 .ok_or(GroupError::NoHealthyMember)
162 }
163
164 pub fn mark_unhealthy(&mut self, index: u8) {
166 if let Some(member) = self.members.get_mut(index as usize) {
175 if member.index == index {
176 member.healthy = false;
177 self.lb
178 .update_health(&member.entity_id_bytes, HealthStatus::Unhealthy);
179 }
180 }
181 }
182
183 pub fn mark_healthy(&mut self, index: u8) {
185 if let Some(member) = self.members.get_mut(index as usize) {
188 if member.index == index {
189 member.healthy = true;
190 self.lb
191 .update_health(&member.entity_id_bytes, HealthStatus::Healthy);
192 }
193 }
194 }
195
196 pub fn update_member_placement(
200 &mut self,
201 index: u8,
202 new_node_id: u64,
203 new_entity_id_bytes: NodeId,
204 ) {
205 if let Some(member) = self.members.get_mut(index as usize) {
207 if member.index != index {
208 return;
209 }
210 self.lb.remove_endpoint(&member.entity_id_bytes);
212 let old_entity_id = member.entity_id_bytes;
213 let origin_hash = member.origin_hash;
214 member.node_id = new_node_id;
215 member.entity_id_bytes = new_entity_id_bytes;
216 member.healthy = true;
217 self.lb.add_endpoint(Endpoint::new(new_entity_id_bytes));
218 self.origin_hash_by_entity_id.remove(&old_entity_id);
222 self.origin_hash_by_entity_id
223 .insert(new_entity_id_bytes, origin_hash);
224 }
225 }
226
227 pub fn on_node_recovery(
232 &mut self,
233 recovered_node_id: u64,
234 registry: &crate::adapter::net::compute::registry::DaemonRegistry,
235 ) {
236 for member in &mut self.members {
237 if member.node_id == recovered_node_id
238 && !member.healthy
239 && registry.contains(member.origin_hash)
240 {
241 member.healthy = true;
242 self.lb
243 .update_health(&member.entity_id_bytes, HealthStatus::Healthy);
244 }
245 }
246 }
247
248 pub fn health(&self) -> GroupHealth {
250 let healthy_count = self.members.iter().filter(|m| m.healthy).count();
251 let total_count = self.members.len();
252 let healthy = healthy_count.min(u8::MAX as usize) as u8;
254 let total = total_count.min(u8::MAX as usize) as u8;
255 if healthy_count == 0 {
256 GroupHealth::Dead
257 } else if healthy_count == total_count {
258 GroupHealth::Healthy
259 } else {
260 GroupHealth::Degraded { healthy, total }
261 }
262 }
263
264 pub fn members(&self) -> &[MemberInfo] {
266 &self.members
267 }
268
269 pub fn member_count(&self) -> u8 {
271 self.members.len().min(u8::MAX as usize) as u8
272 }
273
274 pub fn healthy_count(&self) -> u8 {
276 self.members
277 .iter()
278 .filter(|m| m.healthy)
279 .count()
280 .min(u8::MAX as usize) as u8
281 }
282
283 pub fn members_on_node(&self, node_id: u64) -> Vec<u8> {
285 self.members
286 .iter()
287 .filter(|m| m.node_id == node_id)
288 .map(|m| m.index)
289 .collect()
290 }
291
292 fn origin_hash_for_entity_id(&self, entity_id: &NodeId) -> Option<u64> {
298 self.origin_hash_by_entity_id.get(entity_id).copied()
299 }
300
301 pub fn place_with_spread(
303 scheduler: &Scheduler,
304 requirements: &CapabilityFilter,
305 exclude: &HashSet<u64>,
306 ) -> Result<PlacementDecision, GroupError> {
307 let placement = scheduler.place(requirements)?;
308 if !exclude.contains(&placement.node_id) {
309 return Ok(placement);
310 }
311 let candidates = scheduler.query_candidates(requirements);
313 for node_id in candidates {
314 if !exclude.contains(&node_id) {
315 return Ok(PlacementDecision {
316 node_id,
317 reason: PlacementReason::FirstMatch,
318 });
319 }
320 }
321 Err(GroupError::PlacementFailed(
323 "all candidate nodes are excluded by spread constraint".into(),
324 ))
325 }
326
327 pub fn place_member(
348 scheduler: &Scheduler,
349 artifact: &Artifact<'_>,
350 requirements: &CapabilityFilter,
351 exclude: &HashSet<u64>,
352 placement: &dyn PlacementFilter,
353 tie_break: &TieBreakContext<'_>,
354 ) -> Result<PlacementDecision, GroupError> {
355 let node_id = scheduler
356 .select_member_node(artifact, requirements, exclude, placement, tie_break)
357 .ok_or_else(|| {
358 GroupError::PlacementFailed(
359 "no candidate satisfied placement filter (every node excluded or vetoed)"
360 .into(),
361 )
362 })?;
363 Ok(PlacementDecision {
364 node_id,
365 reason: PlacementReason::BestScore,
366 })
367 }
368}
369
370impl std::fmt::Debug for GroupCoordinator {
371 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
372 f.debug_struct("GroupCoordinator")
373 .field("members", &self.members.len())
374 .field("healthy", &self.healthy_count())
375 .finish()
376 }
377}
378
379#[cfg(test)]
382mod tests {
383 use super::*;
384 use crate::adapter::net::behavior::capability::{CapabilityAnnouncement, CapabilitySet};
385 use crate::adapter::net::behavior::placement::{NodeId as PlacementNodeId, ResourceAxis};
386 use std::sync::Arc;
387
388 fn make_scheduler(node_ids: &[u64]) -> Scheduler {
389 use crate::adapter::net::behavior::fold::{capability_bridge, CapabilityFold, Fold};
390 let fold: Arc<Fold<CapabilityFold>> =
391 Arc::new(Fold::with_sweep_interval(std::time::Duration::ZERO));
392 let eid = crate::adapter::net::identity::EntityId::from_bytes([0u8; 32]);
393 for &id in node_ids {
394 capability_bridge::apply_legacy_announcement(
395 &fold,
396 CapabilityAnnouncement::new(id, eid.clone(), 1, CapabilitySet::new()),
397 )
398 .expect("apply legacy announcement in fixture");
399 }
400 let local = node_ids.first().copied().unwrap_or(0xFFFF);
402 Scheduler::new(fold, local, CapabilitySet::new())
403 }
404
405 struct FixedScore {
408 score: f32,
409 veto: Vec<u64>,
410 }
411
412 impl PlacementFilter for FixedScore {
413 fn placement_score(
414 &self,
415 target: &PlacementNodeId,
416 _artifact: &Artifact<'_>,
417 ) -> Option<f32> {
418 if self.veto.contains(target) {
419 None
420 } else {
421 Some(self.score)
422 }
423 }
424 }
425
426 fn empty_caps() -> CapabilitySet {
427 CapabilitySet::new()
428 }
429
430 fn daemon_artifact<'a>(
431 required: &'a CapabilitySet,
432 optional: &'a CapabilitySet,
433 ) -> Artifact<'a> {
434 Artifact::Daemon {
435 daemon_id: [0u8; 32],
436 required,
437 optional,
438 }
439 }
440
441 #[test]
445 fn place_member_stamps_best_score_reason() {
446 let sched = make_scheduler(&[0x1111, 0x2222, 0x3333]);
447 let req = empty_caps();
448 let opt = empty_caps();
449 let artifact = daemon_artifact(&req, &opt);
450 let placement = FixedScore {
451 score: 0.7,
452 veto: vec![],
453 };
454 let tb = TieBreakContext {
455 rtt_lookup: None,
456 resource_axis: ResourceAxis::Compute,
457 };
458 let exclude = HashSet::new();
459
460 let decision = GroupCoordinator::place_member(
461 &sched,
462 &artifact,
463 &CapabilityFilter::default(),
464 &exclude,
465 &placement,
466 &tb,
467 )
468 .expect("placement should succeed with three eligible candidates");
469
470 assert_eq!(decision.reason, PlacementReason::BestScore);
471 assert_eq!(decision.node_id, 0x1111);
473 }
474
475 #[test]
480 fn place_member_excludes_already_placed_nodes() {
481 let sched = make_scheduler(&[0x1111, 0x2222, 0x3333]);
482 let req = empty_caps();
483 let opt = empty_caps();
484 let artifact = daemon_artifact(&req, &opt);
485 let placement = FixedScore {
486 score: 0.5,
487 veto: vec![],
488 };
489 let tb = TieBreakContext {
490 rtt_lookup: None,
491 resource_axis: ResourceAxis::Compute,
492 };
493
494 let mut exclude = HashSet::new();
495 exclude.insert(0x1111u64);
496
497 let decision = GroupCoordinator::place_member(
498 &sched,
499 &artifact,
500 &CapabilityFilter::default(),
501 &exclude,
502 &placement,
503 &tb,
504 )
505 .expect("placement should succeed with two remaining candidates");
506
507 assert_eq!(decision.reason, PlacementReason::BestScore);
508 assert_eq!(decision.node_id, 0x2222);
509 }
510
511 #[test]
516 fn place_member_returns_placement_failed_when_all_vetoed_or_excluded() {
517 let sched = make_scheduler(&[0x1111, 0x2222, 0x3333]);
518 let req = empty_caps();
519 let opt = empty_caps();
520 let artifact = daemon_artifact(&req, &opt);
521 let placement = FixedScore {
523 score: 1.0,
524 veto: vec![0x2222],
525 };
526 let tb = TieBreakContext {
527 rtt_lookup: None,
528 resource_axis: ResourceAxis::Compute,
529 };
530 let mut exclude = HashSet::new();
531 exclude.insert(0x1111u64);
532 exclude.insert(0x3333u64);
533
534 let err = GroupCoordinator::place_member(
535 &sched,
536 &artifact,
537 &CapabilityFilter::default(),
538 &exclude,
539 &placement,
540 &tb,
541 )
542 .expect_err("every candidate is filtered out");
543
544 match err {
545 GroupError::PlacementFailed(msg) => {
546 assert!(
547 msg.contains("excluded or vetoed"),
548 "error message should explain why placement failed: {msg}"
549 );
550 }
551 other => panic!("expected PlacementFailed, got {other:?}"),
552 }
553 }
554
555 #[test]
561 fn place_member_picks_highest_scoring_over_lex_order() {
562 let sched = make_scheduler(&[0x1111, 0x2222, 0x3333]);
563 let req = empty_caps();
564 let opt = empty_caps();
565 let artifact = daemon_artifact(&req, &opt);
566
567 struct ScoredFilter;
569 impl PlacementFilter for ScoredFilter {
570 fn placement_score(&self, target: &PlacementNodeId, _: &Artifact<'_>) -> Option<f32> {
571 Some(match *target {
572 0x1111 => 0.1,
573 0x2222 => 0.9,
574 0x3333 => 0.5,
575 _ => 0.0,
576 })
577 }
578 }
579
580 let tb = TieBreakContext {
581 rtt_lookup: None,
582 resource_axis: ResourceAxis::Compute,
583 };
584 let exclude = HashSet::new();
585
586 let decision = GroupCoordinator::place_member(
587 &sched,
588 &artifact,
589 &CapabilityFilter::default(),
590 &exclude,
591 &ScoredFilter,
592 &tb,
593 )
594 .expect("placement should succeed");
595
596 assert_eq!(decision.reason, PlacementReason::BestScore);
597 assert_eq!(
598 decision.node_id, 0x2222,
599 "highest scorer wins, NOT the lex-lowest"
600 );
601 }
602
603 #[test]
605 fn place_member_returns_placement_failed_for_empty_index() {
606 let sched = make_scheduler(&[]);
607 let req = empty_caps();
608 let opt = empty_caps();
609 let artifact = daemon_artifact(&req, &opt);
610 let placement = FixedScore {
611 score: 1.0,
612 veto: vec![],
613 };
614 let tb = TieBreakContext {
615 rtt_lookup: None,
616 resource_axis: ResourceAxis::Compute,
617 };
618 let exclude = HashSet::new();
619
620 let err = GroupCoordinator::place_member(
621 &sched,
622 &artifact,
623 &CapabilityFilter::default(),
624 &exclude,
625 &placement,
626 &tb,
627 )
628 .expect_err("empty index → placement failure");
629
630 assert!(matches!(err, GroupError::PlacementFailed(_)));
631 }
632
633 #[test]
637 fn place_with_spread_unchanged_after_place_member_added() {
638 let sched = make_scheduler(&[0x1111, 0x2222, 0x3333]);
639 let exclude = HashSet::new();
640 let decision =
641 GroupCoordinator::place_with_spread(&sched, &CapabilityFilter::default(), &exclude)
642 .expect("legacy path still works");
643 assert_eq!(decision.node_id, 0x1111);
645 assert_eq!(decision.reason, PlacementReason::LocalPreferred);
646 }
647
648 fn make_member(index: u8, origin_hash: u64) -> MemberInfo {
649 let mut entity = [0u8; 32];
654 entity[0] = index;
655 MemberInfo {
656 index,
657 origin_hash,
658 node_id: 0xA000 | index as u64,
659 entity_id_bytes: entity,
660 healthy: true,
661 }
662 }
663
664 #[test]
670 fn origin_hash_lookup_uses_reverse_index_across_mutations() {
671 let mut coord = GroupCoordinator::new(Strategy::RoundRobin);
672 let m0 = make_member(0, 0xA1);
673 let m1 = make_member(1, 0xA2);
674 let m2 = make_member(2, 0xA3);
675 coord.add_member(m0.clone());
676 coord.add_member(m1.clone());
677 coord.add_member(m2.clone());
678
679 assert_eq!(
680 coord.origin_hash_for_entity_id(&m0.entity_id_bytes),
681 Some(0xA1)
682 );
683 assert_eq!(
684 coord.origin_hash_for_entity_id(&m1.entity_id_bytes),
685 Some(0xA2)
686 );
687 assert_eq!(
688 coord.origin_hash_for_entity_id(&m2.entity_id_bytes),
689 Some(0xA3)
690 );
691 let mut stranger = [0u8; 32];
693 stranger[0] = 0xFE;
694 assert_eq!(coord.origin_hash_for_entity_id(&stranger), None);
695
696 let removed = coord.remove_last().expect("pop returns the tail");
698 assert_eq!(removed.origin_hash, 0xA3);
699 assert_eq!(
700 coord.origin_hash_for_entity_id(&m2.entity_id_bytes),
701 None,
702 "remove_last must clear the reverse-index entry"
703 );
704
705 let mut new_entity = [0u8; 32];
709 new_entity[0] = 0xC0;
710 new_entity[1] = 0x01;
711 coord.update_member_placement(0, 0xBEEF, new_entity);
712 assert_eq!(
713 coord.origin_hash_for_entity_id(&m0.entity_id_bytes),
714 None,
715 "stale entity_id must be evicted from the reverse index"
716 );
717 assert_eq!(
718 coord.origin_hash_for_entity_id(&new_entity),
719 Some(0xA1),
720 "new entity_id must resolve to the same origin_hash"
721 );
722 }
723
724 #[test]
732 fn mark_health_resolves_via_direct_index() {
733 let mut coord = GroupCoordinator::new(Strategy::RoundRobin);
734 coord.add_member(make_member(0, 0xA1));
735 coord.add_member(make_member(1, 0xA2));
736 coord.add_member(make_member(2, 0xA3));
737
738 assert!(coord.members[1].healthy);
740
741 coord.mark_unhealthy(1);
742 assert!(!coord.members[1].healthy);
743
744 coord.mark_healthy(1);
745 assert!(coord.members[1].healthy);
746
747 coord.mark_unhealthy(99);
750 assert!(coord.members[0].healthy);
751 assert!(coord.members[1].healthy);
752 assert!(coord.members[2].healthy);
753 }
754}