use std::{collections::HashSet, sync::Arc};
use marshal_entities::{GetAllSessions, Session};
use myko::{
command::{CommandContext, CommandError, CommandHandler},
core::item::Eventable,
myko_command,
prelude::myko_saga,
saga::{SagaContext, SagaHandler},
server::CellServerCtx,
utils::downcast_item,
wire::{MEvent, MEventType},
};
use uuid::Uuid;
pub fn link() {}
pub fn dedupe_nickname(desired: &str, taken: &HashSet<&str>) -> String {
let base = strip_dash_digits(desired);
if !taken.contains(base) {
return base.to_string();
}
let mut n: u32 = 2;
loop {
let candidate = format!("{base}-{n}");
if !taken.contains(candidate.as_str()) {
return candidate;
}
n += 1;
}
}
fn strip_dash_digits(s: &str) -> &str {
let bytes = s.as_bytes();
let mut i = bytes.len();
while i > 0 && bytes[i - 1].is_ascii_digit() {
i -= 1;
}
if i == bytes.len() || i == 0 || bytes[i - 1] != b'-' {
return s;
}
&s[..i - 1]
}
#[myko_saga]
pub struct DedupeNicknameSaga;
impl SagaHandler for DedupeNicknameSaga {
type EventItem = Session;
type Command = DedupeNicknames;
const EVENT_TYPE: MEventType = MEventType::SET;
fn handle(session: Session, _event: MEvent, ctx: Arc<SagaContext>) -> Option<Self::Command> {
let store = ctx.registry.get(Session::ENTITY_NAME_STATIC)?;
let other_nicknames: Vec<String> = store
.snapshot()
.into_iter()
.filter_map(|(_, item)| downcast_item::<Session>(&item))
.filter(|other| other.id != session.id)
.map(|other| other.nickname)
.collect();
let taken: HashSet<&str> = other_nicknames.iter().map(String::as_str).collect();
if !taken.contains(session.nickname.as_str()) {
return None;
}
log::info!(
"[dedupe-nickname] session {} nickname '{}' collides; dispatching dedupe pass",
session.id.0,
session.nickname,
);
Some(DedupeNicknames {})
}
}
#[myko_command]
pub struct DedupeNicknames {}
impl CommandHandler for DedupeNicknames {
fn execute(self, ctx: CommandContext) -> Result<(), CommandError> {
let sessions: Vec<Arc<Session>> = ctx.exec_query(GetAllSessions {})?;
let Some((victim, new_name)) = pick_one_correction(&sessions) else {
return Ok(());
};
log::info!(
"[dedupe-nickname] correcting {} from '{}' → '{}'",
victim.id.0,
victim.nickname,
new_name,
);
let updated = Session {
nickname: new_name,
..(**victim).clone()
};
ctx.emit_set(&updated)?;
Ok(())
}
}
fn pick_one_correction(sessions: &[Arc<Session>]) -> Option<(&Arc<Session>, String)> {
let mut ordered: Vec<usize> = (0..sessions.len()).collect();
ordered.sort_by(|&a, &b| {
sessions[a]
.connected_at
.cmp(&sessions[b].connected_at)
.then_with(|| sessions[a].id.0.as_ref().cmp(sessions[b].id.0.as_ref()))
});
let mut seen: HashSet<&str> = HashSet::new();
for &i in &ordered {
let s = &sessions[i];
if seen.contains(s.nickname.as_str()) {
let taken: HashSet<&str> = sessions
.iter()
.filter(|other| other.id != s.id)
.map(|other| other.nickname.as_str())
.collect();
let new_name = dedupe_nickname(&s.nickname, &taken);
return Some((s, new_name));
}
seen.insert(s.nickname.as_str());
}
None
}
pub fn run_until_converged(ctx: &CellServerCtx, max_passes: usize) -> Result<usize, String> {
let mut applied = 0usize;
for _ in 0..max_passes {
let store = ctx
.registry
.get(Session::ENTITY_NAME_STATIC)
.ok_or_else(|| "Session store not registered".to_string())?;
let snapshot = store.snapshot();
let sessions: Vec<Arc<Session>> = snapshot
.into_iter()
.filter_map(|(_, item)| downcast_item::<Session>(&item).map(Arc::new))
.collect();
let Some((victim, new_name)) = pick_one_correction(&sessions) else {
return Ok(applied);
};
let updated = Session {
nickname: new_name,
..(**victim).clone()
};
let event = MEvent::from_item(&updated, MEventType::SET, &Uuid::new_v4().to_string());
ctx.apply_event_batch(vec![event])
.map_err(|e| format!("apply_event_batch: {e}"))?;
applied += 1;
}
Err(format!(
"dedupe did not converge within {max_passes} passes — likely an idempotency bug",
))
}
#[cfg(test)]
mod tests {
use super::*;
fn taken(items: &[&'static str]) -> HashSet<&'static str> {
items.iter().copied().collect()
}
#[test]
fn no_collision_returns_input_unchanged() {
assert_eq!(dedupe_nickname("marshal", &taken(&[])), "marshal");
assert_eq!(
dedupe_nickname("marshal", &taken(&["other", "another"])),
"marshal",
);
}
#[test]
fn first_collision_picks_two() {
assert_eq!(
dedupe_nickname("marshal", &taken(&["marshal"])),
"marshal-2"
);
}
#[test]
fn walks_to_three_when_two_taken() {
assert_eq!(
dedupe_nickname("marshal", &taken(&["marshal", "marshal-2"])),
"marshal-3",
);
}
#[test]
fn arriving_with_dash_digit_strips_to_root() {
assert_eq!(
dedupe_nickname("marshal-3", &taken(&["marshal"])),
"marshal-2",
);
}
#[test]
fn arriving_with_dash_digit_returns_unchanged_when_not_taken() {
assert_eq!(
dedupe_nickname("marshal-3", &taken(&["marshal", "marshal-2"])),
"marshal-3",
);
}
#[test]
fn strip_dash_digits_handles_edge_cases() {
assert_eq!(strip_dash_digits("marshal"), "marshal");
assert_eq!(strip_dash_digits("foo-"), "foo-");
assert_eq!(strip_dash_digits("12345"), "12345");
assert_eq!(strip_dash_digits("marshal-42"), "marshal");
assert_eq!(strip_dash_digits("foo-bar-7"), "foo-bar");
assert_eq!(strip_dash_digits(""), "");
}
}