myko_server/
server_ownership.rs1use std::{
2 collections::{HashMap, HashSet},
3 sync::Arc,
4};
5
6use myko::{
7 entities::server::{GetAllServers, ServerId},
8 event::EventOptions,
9 relationship::iter_server_owned_registrations,
10 server::{CellServerCtx, PersistError},
11};
12
13pub struct ServerOwnershipManager;
14
15impl ServerOwnershipManager {
16 fn live_server_ids(ctx: &CellServerCtx) -> Vec<ServerId> {
18 use hyphae::Gettable;
19 let req = ctx.new_server_transaction();
20 ctx.query_map(GetAllServers {}, req)
21 .items()
22 .get()
23 .iter()
24 .map(|s| s.id.clone())
25 .collect()
26 }
27
28 fn count_distribution(ctx: &CellServerCtx) -> HashMap<Arc<str>, usize> {
31 let mut counts: HashMap<Arc<str>, usize> = HashMap::new();
32
33 for reg in iter_server_owned_registrations() {
34 let store = ctx.registry.get_or_create(reg.entity_type);
35 for (_, item) in store.snapshot() {
36 if let Some(owner) = item.server_owner() {
37 *counts.entry(Arc::from(owner)).or_default() += 1;
38 }
39 }
40 }
41 counts
42 }
43
44 fn least_loaded(live_ids: &[ServerId], counts: &HashMap<Arc<str>, usize>) -> Option<ServerId> {
46 live_ids
47 .iter()
48 .min_by_key(|id| counts.get(id.0.as_ref()).copied().unwrap_or(0))
49 .cloned()
50 }
51
52 pub fn claim_orphaned(ctx: &CellServerCtx) -> Result<(), PersistError> {
54 let live_ids = Self::live_server_ids(ctx);
55 if live_ids.is_empty() {
56 log::warn!("[ServerOwnership] No live servers found, skipping orphan claim");
57 return Ok(());
58 }
59
60 let live_set: HashSet<&str> = live_ids.iter().map(|id| id.0.as_ref()).collect();
61 let mut counts = Self::count_distribution(ctx);
62 counts.retain(|k, _| live_set.contains(k.as_ref()));
63
64 let mut reassigned = 0usize;
65
66 for reg in iter_server_owned_registrations() {
67 let store = ctx.registry.get_or_create(reg.entity_type);
68 let items: Vec<_> = store.snapshot().into_iter().map(|(_, item)| item).collect();
69
70 for item in &items {
71 let current_owner = item.server_owner().unwrap_or("");
72
73 if !current_owner.is_empty() && live_set.contains(current_owner) {
74 continue; }
76
77 let Some(new_owner) = Self::least_loaded(&live_ids, &counts) else {
78 continue;
79 };
80
81 if let Some(patched) = item.bake_server_owner(&new_owner.0) {
82 ctx.set_dyn_with_options(
83 patched,
84 Some(EventOptions {
85 prevent_relationship_updates: true,
86 ..Default::default()
87 }),
88 )?;
89 *counts.entry(new_owner.0.clone()).or_default() += 1;
90 reassigned += 1;
91 }
92 }
93 }
94
95 if reassigned > 0 {
96 log::info!(
97 "[ServerOwnership] Reassigned {} orphaned item(s)",
98 reassigned
99 );
100 }
101 Ok(())
102 }
103
104 pub fn watch_peer_deaths(ctx: &CellServerCtx) -> hyphae::SubscriptionGuard {
107 use hyphae::{Gettable, Signal, Watchable};
108
109 let req = ctx.new_server_transaction();
110 let servers_cell = ctx.query_map(GetAllServers {}, req).items();
111
112 let prev_ids: std::sync::Mutex<HashSet<Arc<str>>> =
113 std::sync::Mutex::new(servers_cell.get().iter().map(|s| s.id.0.clone()).collect());
114
115 let ctx = ctx.clone();
116 servers_cell.subscribe(move |signal| {
117 let Signal::Value(servers) = signal else {
118 return;
119 };
120
121 let current_ids: HashSet<Arc<str>> = servers.iter().map(|s| s.id.0.clone()).collect();
122
123 let mut prev = prev_ids.lock().unwrap();
124 let removed: Vec<Arc<str>> = prev.difference(¤t_ids).cloned().collect();
125 *prev = current_ids;
126 drop(prev);
127
128 if removed.is_empty() {
129 return;
130 }
131
132 for id in &removed {
133 log::warn!(
134 "[ServerOwnership] Server {} left cluster, redistributing",
135 id
136 );
137 }
138
139 if let Err(e) = ServerOwnershipManager::claim_orphaned(&ctx) {
140 log::error!("[ServerOwnership] Failed to redistribute: {}", e);
141 }
142 })
143 }
144}