Skip to main content

myko_server/
server_ownership.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use myko::{
5    entities::server::{GetAllServers, ServerId},
6    event::EventOptions,
7    relationship::iter_server_owned_registrations,
8    server::{CellServerCtx, PersistError},
9};
10
11pub struct ServerOwnershipManager;
12
13impl ServerOwnershipManager {
14    /// Get IDs of all currently live servers.
15    fn live_server_ids(ctx: &CellServerCtx) -> Vec<ServerId> {
16        use hyphae::Gettable;
17        let req = ctx.new_server_transaction();
18        ctx.query_map(GetAllServers {}, req)
19            .items()
20            .get()
21            .iter()
22            .map(|s| s.id.clone())
23            .collect()
24    }
25
26    /// Count how many server_owned items each server currently owns
27    /// across ALL registered #[server_owned] types.
28    fn count_distribution(ctx: &CellServerCtx) -> HashMap<Arc<str>, usize> {
29        let mut counts: HashMap<Arc<str>, usize> = HashMap::new();
30
31        for reg in iter_server_owned_registrations() {
32            let store = ctx.registry.get_or_create(reg.entity_type);
33            for (_, item) in store.snapshot() {
34                if let Some(owner) = item.server_owner() {
35                    *counts.entry(Arc::from(owner)).or_default() += 1;
36                }
37            }
38        }
39        counts
40    }
41
42    /// Pick the server with the lowest item count from the given set.
43    fn least_loaded(live_ids: &[ServerId], counts: &HashMap<Arc<str>, usize>) -> Option<ServerId> {
44        live_ids
45            .iter()
46            .min_by_key(|id| counts.get(id.0.as_ref()).copied().unwrap_or(0))
47            .cloned()
48    }
49
50    /// Scan all server_owned items and reassign any referencing dead/empty servers.
51    pub fn claim_orphaned(ctx: &CellServerCtx) -> Result<(), PersistError> {
52        let live_ids = Self::live_server_ids(ctx);
53        if live_ids.is_empty() {
54            log::warn!("[ServerOwnership] No live servers found, skipping orphan claim");
55            return Ok(());
56        }
57
58        let live_set: HashSet<&str> = live_ids.iter().map(|id| id.0.as_ref()).collect();
59        let mut counts = Self::count_distribution(ctx);
60        counts.retain(|k, _| live_set.contains(k.as_ref()));
61
62        let mut reassigned = 0usize;
63
64        for reg in iter_server_owned_registrations() {
65            let store = ctx.registry.get_or_create(reg.entity_type);
66            let items: Vec<_> = store.snapshot().into_iter().map(|(_, item)| item).collect();
67
68            for item in &items {
69                let current_owner = item.server_owner().unwrap_or("");
70
71                if !current_owner.is_empty() && live_set.contains(current_owner) {
72                    continue; // healthy
73                }
74
75                let Some(new_owner) = Self::least_loaded(&live_ids, &counts) else {
76                    continue;
77                };
78
79                if let Some(patched) = item.bake_server_owner(&new_owner.0) {
80                    ctx.set_dyn_with_options(
81                        patched,
82                        Some(EventOptions {
83                            prevent_relationship_updates: true,
84                            ..Default::default()
85                        }),
86                    )?;
87                    *counts.entry(new_owner.0.clone()).or_default() += 1;
88                    reassigned += 1;
89                }
90            }
91        }
92
93        if reassigned > 0 {
94            log::info!(
95                "[ServerOwnership] Reassigned {} orphaned item(s)",
96                reassigned
97            );
98        }
99        Ok(())
100    }
101
102    /// Watch for Server entity removals and redistribute orphaned items.
103    /// Returns a SubscriptionGuard that must be kept alive.
104    pub fn watch_peer_deaths(ctx: &CellServerCtx) -> hyphae::SubscriptionGuard {
105        use hyphae::{Gettable, Signal, Watchable};
106
107        let req = ctx.new_server_transaction();
108        let servers_cell = ctx.query_map(GetAllServers {}, req).items();
109
110        let prev_ids: std::sync::Mutex<HashSet<Arc<str>>> =
111            std::sync::Mutex::new(servers_cell.get().iter().map(|s| s.id.0.clone()).collect());
112
113        let ctx = ctx.clone();
114        servers_cell.subscribe(move |signal| {
115            let Signal::Value(servers) = signal else {
116                return;
117            };
118
119            let current_ids: HashSet<Arc<str>> = servers.iter().map(|s| s.id.0.clone()).collect();
120
121            let mut prev = prev_ids.lock().unwrap();
122            let removed: Vec<Arc<str>> = prev.difference(&current_ids).cloned().collect();
123            *prev = current_ids;
124            drop(prev);
125
126            if removed.is_empty() {
127                return;
128            }
129
130            for id in &removed {
131                log::warn!(
132                    "[ServerOwnership] Server {} left cluster, redistributing",
133                    id
134                );
135            }
136
137            if let Err(e) = ServerOwnershipManager::claim_orphaned(&ctx) {
138                log::error!("[ServerOwnership] Failed to redistribute: {}", e);
139            }
140        })
141    }
142}