myko_server/
server_ownership.rs1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use myko::{
5 entities::server::{GetAllServers, ServerId},
6 event::EventOptions,
7 relationship::iter_server_owned_registrations,
8 server::{CellServerCtx, PersistError},
9};
10
11pub struct ServerOwnershipManager;
12
13impl ServerOwnershipManager {
14 fn live_server_ids(ctx: &CellServerCtx) -> Vec<ServerId> {
16 use hyphae::Gettable;
17 let req = ctx.new_server_transaction();
18 ctx.query_map(GetAllServers {}, req)
19 .items()
20 .get()
21 .iter()
22 .map(|s| s.id.clone())
23 .collect()
24 }
25
26 fn count_distribution(ctx: &CellServerCtx) -> HashMap<Arc<str>, usize> {
29 let mut counts: HashMap<Arc<str>, usize> = HashMap::new();
30
31 for reg in iter_server_owned_registrations() {
32 let store = ctx.registry.get_or_create(reg.entity_type);
33 for (_, item) in store.snapshot() {
34 if let Some(owner) = item.server_owner() {
35 *counts.entry(Arc::from(owner)).or_default() += 1;
36 }
37 }
38 }
39 counts
40 }
41
42 fn least_loaded(live_ids: &[ServerId], counts: &HashMap<Arc<str>, usize>) -> Option<ServerId> {
44 live_ids
45 .iter()
46 .min_by_key(|id| counts.get(id.0.as_ref()).copied().unwrap_or(0))
47 .cloned()
48 }
49
50 pub fn claim_orphaned(ctx: &CellServerCtx) -> Result<(), PersistError> {
52 let live_ids = Self::live_server_ids(ctx);
53 if live_ids.is_empty() {
54 log::warn!("[ServerOwnership] No live servers found, skipping orphan claim");
55 return Ok(());
56 }
57
58 let live_set: HashSet<&str> = live_ids.iter().map(|id| id.0.as_ref()).collect();
59 let mut counts = Self::count_distribution(ctx);
60 counts.retain(|k, _| live_set.contains(k.as_ref()));
61
62 let mut reassigned = 0usize;
63
64 for reg in iter_server_owned_registrations() {
65 let store = ctx.registry.get_or_create(reg.entity_type);
66 let items: Vec<_> = store.snapshot().into_iter().map(|(_, item)| item).collect();
67
68 for item in &items {
69 let current_owner = item.server_owner().unwrap_or("");
70
71 if !current_owner.is_empty() && live_set.contains(current_owner) {
72 continue; }
74
75 let Some(new_owner) = Self::least_loaded(&live_ids, &counts) else {
76 continue;
77 };
78
79 if let Some(patched) = item.bake_server_owner(&new_owner.0) {
80 ctx.set_dyn_with_options(
81 patched,
82 Some(EventOptions {
83 prevent_relationship_updates: true,
84 ..Default::default()
85 }),
86 )?;
87 *counts.entry(new_owner.0.clone()).or_default() += 1;
88 reassigned += 1;
89 }
90 }
91 }
92
93 if reassigned > 0 {
94 log::info!(
95 "[ServerOwnership] Reassigned {} orphaned item(s)",
96 reassigned
97 );
98 }
99 Ok(())
100 }
101
102 pub fn watch_peer_deaths(ctx: &CellServerCtx) -> hyphae::SubscriptionGuard {
105 use hyphae::{Gettable, Signal, Watchable};
106
107 let req = ctx.new_server_transaction();
108 let servers_cell = ctx.query_map(GetAllServers {}, req).items();
109
110 let prev_ids: std::sync::Mutex<HashSet<Arc<str>>> =
111 std::sync::Mutex::new(servers_cell.get().iter().map(|s| s.id.0.clone()).collect());
112
113 let ctx = ctx.clone();
114 servers_cell.subscribe(move |signal| {
115 let Signal::Value(servers) = signal else {
116 return;
117 };
118
119 let current_ids: HashSet<Arc<str>> = servers.iter().map(|s| s.id.0.clone()).collect();
120
121 let mut prev = prev_ids.lock().unwrap();
122 let removed: Vec<Arc<str>> = prev.difference(¤t_ids).cloned().collect();
123 *prev = current_ids;
124 drop(prev);
125
126 if removed.is_empty() {
127 return;
128 }
129
130 for id in &removed {
131 log::warn!(
132 "[ServerOwnership] Server {} left cluster, redistributing",
133 id
134 );
135 }
136
137 if let Err(e) = ServerOwnershipManager::claim_orphaned(&ctx) {
138 log::error!("[ServerOwnership] Failed to redistribute: {}", e);
139 }
140 })
141 }
142}