use std::collections::{HashMap, HashSet};
use std::sync::Mutex;
use std::time::{Duration, SystemTime};
use chrono::{DateTime, Utc};
use once_cell::sync::Lazy;
use serde_json::{json, Map, Value};
use sha2::{Digest, Sha256};
use crate::canonical::canonical_json;
use crate::error::Error;
use crate::keys::{verify, PUBKEY_LEN};
use crate::models::Participant;
pub const CLOCK_SKEW_SECS: i64 = 60;
pub const MAX_BODY_BYTES: usize = 16 * 1024;
pub const TOPIC_MIN_CHARS: usize = 1;
pub const TOPIC_MAX_CHARS: usize = 256;
pub const MAX_TURNS_MIN: i64 = 1;
pub const MAX_TURNS_MAX: i64 = 1000;
pub const TTL_HOURS_MIN: i64 = 1;
pub const TTL_HOURS_MAX: i64 = 720;
pub fn create_room_payload(
topic: &str,
invite_pubkeys: &[String],
max_turns: i64,
ttl_hours: i64,
created_at: &DateTime<Utc>,
) -> Vec<u8> {
let v = json!({
"topic": topic,
"invite_pubkeys": invite_pubkeys,
"max_turns": max_turns,
"ttl_hours": ttl_hours,
"created_at": iso8601_python(created_at),
});
canonical_json(&v)
}
pub fn accept_payload(room_id: &str, agent_pubkey: &str, created_at: &DateTime<Utc>) -> Vec<u8> {
let v = json!({
"room_id": room_id,
"agent_pubkey": agent_pubkey,
"created_at": iso8601_python(created_at),
});
canonical_json(&v)
}
pub fn close_payload(room_id: &str, summary: Option<&str>, created_at: &DateTime<Utc>) -> Vec<u8> {
let mut obj = Map::new();
obj.insert("room_id".to_string(), Value::String(room_id.to_string()));
obj.insert(
"summary".to_string(),
match summary {
Some(s) => Value::String(s.to_string()),
None => Value::Null,
},
);
obj.insert(
"created_at".to_string(),
Value::String(iso8601_python(created_at)),
);
canonical_json(&Value::Object(obj))
}
pub fn post_message_payload(
room_id: &str,
author_pubkey: &str,
turn_n: i64,
body: &str,
created_at: &DateTime<Utc>,
) -> Vec<u8> {
let v = json!({
"room_id": room_id,
"turn_n": turn_n,
"author_pubkey": author_pubkey,
"body": body,
"created_at": iso8601_python(created_at),
});
canonical_json(&v)
}
pub fn iso8601_python(dt: &DateTime<Utc>) -> String {
let micros = dt.timestamp_subsec_micros();
if micros == 0 {
dt.format("%Y-%m-%dT%H:%M:%S+00:00").to_string()
} else {
format!("{}.{:06}+00:00", dt.format("%Y-%m-%dT%H:%M:%S"), micros)
}
}
pub fn is_timestamp_fresh(ts: &DateTime<Utc>, now: &DateTime<Utc>) -> bool {
(ts.timestamp() - now.timestamp()).abs() <= CLOCK_SKEW_SECS
}
pub fn check_freshness(ts: &DateTime<Utc>) -> Result<(), Error> {
if is_timestamp_fresh(ts, &Utc::now()) {
Ok(())
} else {
Err(Error::StaleTimestamp)
}
}
pub fn verify_signed(
pubkey: &[u8; PUBKEY_LEN],
canonical_bytes: &[u8],
signature: &[u8],
) -> Result<(), Error> {
if verify(pubkey, canonical_bytes, signature) {
Ok(())
} else {
Err(Error::BadSignature)
}
}
pub fn next_turn_owner<'a>(
participants: &'a [Participant],
current: Option<&str>,
) -> Option<&'a str> {
let mut accepted: Vec<&Participant> = participants
.iter()
.filter(|p| p.accepted_at.is_some())
.collect();
accepted.sort_by_key(|p| p.invited_at);
if accepted.is_empty() {
return None;
}
let pks: Vec<&str> = accepted.iter().map(|p| p.agent_pubkey.as_str()).collect();
let idx = match current.and_then(|c| pks.iter().position(|p| *p == c)) {
Some(i) => (i + 1) % pks.len(),
None => 0,
};
Some(pks[idx])
}
pub fn dedup_invites(creator_pubkey: &str, invite_pubkeys: &[String]) -> Vec<String> {
let mut seen: HashSet<&str> = HashSet::new();
seen.insert(creator_pubkey);
let mut out = Vec::with_capacity(invite_pubkeys.len());
for pk in invite_pubkeys {
if seen.insert(pk.as_str()) {
out.push(pk.clone());
}
}
out
}
#[allow(clippy::too_many_arguments)]
pub fn check_post_message(
body: &str,
room_status: &str,
room_ttl_until: &DateTime<Utc>,
caller_is_accepted_participant: bool,
room_turn_owner: Option<&str>,
caller_pubkey: &str,
expected_next_turn: i64,
got_turn: i64,
created_at: &DateTime<Utc>,
now: &DateTime<Utc>,
) -> Result<(), Error> {
if body.len() > MAX_BODY_BYTES {
return Err(Error::BodyTooLarge);
}
if room_status == "closed" || now >= room_ttl_until {
return Err(Error::RoomClosed);
}
if !caller_is_accepted_participant {
return Err(Error::NotAParticipant);
}
match room_turn_owner {
Some(owner) if owner == caller_pubkey => {}
_ => return Err(Error::NotTurnOwner),
}
if got_turn != expected_next_turn {
return Err(Error::TurnConflict {
expected: expected_next_turn,
got: got_turn,
});
}
if !is_timestamp_fresh(created_at, now) {
return Err(Error::StaleTimestamp);
}
Ok(())
}
pub struct DedupStore {
inner: Mutex<HashMap<String, SystemTime>>,
window: Duration,
}
impl DedupStore {
pub fn new(window_secs: u64) -> Self {
Self {
inner: Mutex::new(HashMap::new()),
window: Duration::from_secs(window_secs),
}
}
pub fn check_and_mark(&self, canonical_bytes: &[u8]) -> Result<(), Error> {
let now = SystemTime::now();
let mut hasher = Sha256::new();
hasher.update(canonical_bytes);
let h = hex::encode(hasher.finalize());
let mut map = self.inner.lock().expect("dedup store poisoned");
map.retain(|_, exp| *exp >= now);
if map.contains_key(&h) {
return Err(Error::ReplayDetected);
}
map.insert(h, now + self.window);
Ok(())
}
pub fn reset(&self) {
self.inner.lock().expect("dedup store poisoned").clear();
}
}
impl Default for DedupStore {
fn default() -> Self {
Self::new(CLOCK_SKEW_SECS as u64)
}
}
pub static DEFAULT_DEDUP: Lazy<DedupStore> = Lazy::new(DedupStore::default);
pub fn parse_header_pubkey(value: &str) -> Result<[u8; PUBKEY_LEN], Error> {
if value.len() != PUBKEY_LEN * 2
|| !value
.chars()
.all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())
{
return Err(Error::InvalidPubkey);
}
crate::keys::pubkey_from_hex(value)
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
fn p(agent: &str, invited_secs: i64, accepted: bool) -> Participant {
Participant {
room_id: "r".into(),
agent_pubkey: agent.into(),
owner_pubkey: agent.into(),
invited_by_pubkey: "creator".into(),
invited_at: Utc.timestamp_opt(invited_secs, 0).unwrap(),
accepted_at: if accepted {
Some(Utc.timestamp_opt(invited_secs + 1, 0).unwrap())
} else {
None
},
accept_sig: None,
}
}
#[test]
fn rotation_round_robin() {
let ps = vec![p("a", 0, true), p("b", 1, true), p("c", 2, true)];
assert_eq!(next_turn_owner(&ps, Some("a")), Some("b"));
assert_eq!(next_turn_owner(&ps, Some("b")), Some("c"));
assert_eq!(next_turn_owner(&ps, Some("c")), Some("a"));
}
#[test]
fn rotation_skips_pending() {
let ps = vec![p("a", 0, true), p("b", 1, false), p("c", 2, true)];
assert_eq!(next_turn_owner(&ps, Some("a")), Some("c"));
assert_eq!(next_turn_owner(&ps, Some("c")), Some("a"));
}
#[test]
fn dedup_drops_creator() {
let out = dedup_invites(
"creator",
&["creator".into(), "alice".into(), "alice".into()],
);
assert_eq!(out, vec!["alice".to_string()]);
}
#[test]
fn freshness_window() {
let now = Utc.timestamp_opt(1_000_000, 0).unwrap();
let inside = Utc.timestamp_opt(1_000_000 + 30, 0).unwrap();
let outside = Utc.timestamp_opt(1_000_000 + 90, 0).unwrap();
assert!(is_timestamp_fresh(&inside, &now));
assert!(!is_timestamp_fresh(&outside, &now));
}
#[test]
fn dedup_replay() {
let store = DedupStore::new(60);
let payload = b"{\"hello\":\"world\"}";
assert!(store.check_and_mark(payload).is_ok());
assert_eq!(store.check_and_mark(payload), Err(Error::ReplayDetected));
store.reset();
assert!(store.check_and_mark(payload).is_ok());
}
#[test]
fn header_pubkey_rejects_uppercase() {
let lower = "8a88e3dd7409f195fd52db2d3cba5d72ca6709bf1d94121bf3748801b40f6f5c";
assert!(parse_header_pubkey(lower).is_ok());
let upper = "8A88E3DD7409F195FD52DB2D3CBA5D72CA6709BF1D94121BF3748801B40F6F5C";
assert_eq!(parse_header_pubkey(upper), Err(Error::InvalidPubkey));
}
}