Skip to main content

myko_server/
server_ownership.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::Arc,
4};
5
6use myko::{
7    entities::server::{GetAllServers, ServerId},
8    event::EventOptions,
9    relationship::iter_server_owned_registrations,
10    server::{CellServerCtx, PersistError},
11};
12
13pub struct ServerOwnershipManager;
14
15impl ServerOwnershipManager {
16    /// Get IDs of all currently live servers.
17    fn live_server_ids(ctx: &CellServerCtx) -> Vec<ServerId> {
18        use hyphae::Gettable;
19        let req = ctx.new_server_transaction();
20        ctx.query_map(GetAllServers {}, req)
21            .items()
22            .get()
23            .iter()
24            .map(|s| s.id.clone())
25            .collect()
26    }
27
28    /// Count how many server_owned items each server currently owns
29    /// across ALL registered #[server_owned] types.
30    fn count_distribution(ctx: &CellServerCtx) -> HashMap<Arc<str>, usize> {
31        let mut counts: HashMap<Arc<str>, usize> = HashMap::new();
32
33        for reg in iter_server_owned_registrations() {
34            let store = ctx.registry.get_or_create(reg.entity_type);
35            for (_, item) in store.snapshot() {
36                if let Some(owner) = item.server_owner() {
37                    *counts.entry(Arc::from(owner)).or_default() += 1;
38                }
39            }
40        }
41        counts
42    }
43
44    /// Pick the server with the lowest item count from the given set.
45    fn least_loaded(live_ids: &[ServerId], counts: &HashMap<Arc<str>, usize>) -> Option<ServerId> {
46        live_ids
47            .iter()
48            .min_by_key(|id| counts.get(id.0.as_ref()).copied().unwrap_or(0))
49            .cloned()
50    }
51
52    /// Scan all server_owned items and reassign any referencing dead/empty servers.
53    pub fn claim_orphaned(ctx: &CellServerCtx) -> Result<(), PersistError> {
54        let live_ids = Self::live_server_ids(ctx);
55        if live_ids.is_empty() {
56            log::warn!("[ServerOwnership] No live servers found, skipping orphan claim");
57            return Ok(());
58        }
59
60        let live_set: HashSet<&str> = live_ids.iter().map(|id| id.0.as_ref()).collect();
61        let mut counts = Self::count_distribution(ctx);
62        counts.retain(|k, _| live_set.contains(k.as_ref()));
63
64        let mut reassigned = 0usize;
65
66        for reg in iter_server_owned_registrations() {
67            let store = ctx.registry.get_or_create(reg.entity_type);
68            let items: Vec<_> = store.snapshot().into_iter().map(|(_, item)| item).collect();
69
70            for item in &items {
71                let current_owner = item.server_owner().unwrap_or("");
72
73                if !current_owner.is_empty() && live_set.contains(current_owner) {
74                    continue; // healthy
75                }
76
77                let Some(new_owner) = Self::least_loaded(&live_ids, &counts) else {
78                    continue;
79                };
80
81                if let Some(patched) = item.bake_server_owner(&new_owner.0) {
82                    ctx.set_dyn_with_options(
83                        patched,
84                        Some(EventOptions {
85                            prevent_relationship_updates: true,
86                            ..Default::default()
87                        }),
88                    )?;
89                    *counts.entry(new_owner.0.clone()).or_default() += 1;
90                    reassigned += 1;
91                }
92            }
93        }
94
95        if reassigned > 0 {
96            log::info!(
97                "[ServerOwnership] Reassigned {} orphaned item(s)",
98                reassigned
99            );
100        }
101        Ok(())
102    }
103
104    /// Watch for Server entity removals and redistribute orphaned items.
105    /// Returns a SubscriptionGuard that must be kept alive.
106    pub fn watch_peer_deaths(ctx: &CellServerCtx) -> hyphae::SubscriptionGuard {
107        use hyphae::{Gettable, Signal, Watchable};
108
109        let req = ctx.new_server_transaction();
110        let servers_cell = ctx.query_map(GetAllServers {}, req).items();
111
112        let prev_ids: std::sync::Mutex<HashSet<Arc<str>>> =
113            std::sync::Mutex::new(servers_cell.get().iter().map(|s| s.id.0.clone()).collect());
114
115        let ctx = ctx.clone();
116        servers_cell.subscribe(move |signal| {
117            let Signal::Value(servers) = signal else {
118                return;
119            };
120
121            let current_ids: HashSet<Arc<str>> = servers.iter().map(|s| s.id.0.clone()).collect();
122
123            let mut prev = prev_ids.lock().unwrap();
124            let removed: Vec<Arc<str>> = prev.difference(&current_ids).cloned().collect();
125            *prev = current_ids;
126            drop(prev);
127
128            if removed.is_empty() {
129                return;
130            }
131
132            for id in &removed {
133                log::warn!(
134                    "[ServerOwnership] Server {} left cluster, redistributing",
135                    id
136                );
137            }
138
139            if let Err(e) = ServerOwnershipManager::claim_orphaned(&ctx) {
140                log::error!("[ServerOwnership] Failed to redistribute: {}", e);
141            }
142        })
143    }
144}