1use std::collections::{HashMap, HashSet};
6use std::sync::Mutex;
7use std::time::{Duration, SystemTime};
8
9use chrono::{DateTime, Utc};
10use once_cell::sync::Lazy;
11use serde_json::{json, Map, Value};
12use sha2::{Digest, Sha256};
13
14use crate::canonical::canonical_json;
15use crate::error::Error;
16use crate::keys::{verify, PUBKEY_LEN};
17use crate::models::Participant;
18
19pub const CLOCK_SKEW_SECS: i64 = 60;
21
22pub const MAX_BODY_BYTES: usize = 16 * 1024;
24
25pub const TOPIC_MIN_CHARS: usize = 1;
27pub const TOPIC_MAX_CHARS: usize = 256;
29
30pub const MAX_TURNS_MIN: i64 = 1;
32pub const MAX_TURNS_MAX: i64 = 1000;
34
35pub const TTL_HOURS_MIN: i64 = 1;
37pub const TTL_HOURS_MAX: i64 = 720;
39
40pub fn create_room_payload(
47 topic: &str,
48 invite_pubkeys: &[String],
49 max_turns: i64,
50 ttl_hours: i64,
51 created_at: &DateTime<Utc>,
52) -> Vec<u8> {
53 let v = json!({
54 "topic": topic,
55 "invite_pubkeys": invite_pubkeys,
56 "max_turns": max_turns,
57 "ttl_hours": ttl_hours,
58 "created_at": iso8601_python(created_at),
59 });
60 canonical_json(&v)
61}
62
63pub fn accept_payload(room_id: &str, agent_pubkey: &str, created_at: &DateTime<Utc>) -> Vec<u8> {
66 let v = json!({
67 "room_id": room_id,
68 "agent_pubkey": agent_pubkey,
69 "created_at": iso8601_python(created_at),
70 });
71 canonical_json(&v)
72}
73
74pub fn close_payload(room_id: &str, summary: Option<&str>, created_at: &DateTime<Utc>) -> Vec<u8> {
78 let mut obj = Map::new();
79 obj.insert("room_id".to_string(), Value::String(room_id.to_string()));
80 obj.insert(
81 "summary".to_string(),
82 match summary {
83 Some(s) => Value::String(s.to_string()),
84 None => Value::Null,
85 },
86 );
87 obj.insert(
88 "created_at".to_string(),
89 Value::String(iso8601_python(created_at)),
90 );
91 canonical_json(&Value::Object(obj))
92}
93
94pub fn post_message_payload(
97 room_id: &str,
98 author_pubkey: &str,
99 turn_n: i64,
100 body: &str,
101 created_at: &DateTime<Utc>,
102) -> Vec<u8> {
103 let v = json!({
104 "room_id": room_id,
105 "turn_n": turn_n,
106 "author_pubkey": author_pubkey,
107 "body": body,
108 "created_at": iso8601_python(created_at),
109 });
110 canonical_json(&v)
111}
112
113pub fn iso8601_python(dt: &DateTime<Utc>) -> String {
118 let micros = dt.timestamp_subsec_micros();
119 if micros == 0 {
120 dt.format("%Y-%m-%dT%H:%M:%S+00:00").to_string()
121 } else {
122 format!("{}.{:06}+00:00", dt.format("%Y-%m-%dT%H:%M:%S"), micros)
123 }
124}
125
126pub fn is_timestamp_fresh(ts: &DateTime<Utc>, now: &DateTime<Utc>) -> bool {
132 (ts.timestamp() - now.timestamp()).abs() <= CLOCK_SKEW_SECS
133}
134
135pub fn check_freshness(ts: &DateTime<Utc>) -> Result<(), Error> {
137 if is_timestamp_fresh(ts, &Utc::now()) {
138 Ok(())
139 } else {
140 Err(Error::StaleTimestamp)
141 }
142}
143
144pub fn verify_signed(
151 pubkey: &[u8; PUBKEY_LEN],
152 canonical_bytes: &[u8],
153 signature: &[u8],
154) -> Result<(), Error> {
155 if verify(pubkey, canonical_bytes, signature) {
156 Ok(())
157 } else {
158 Err(Error::BadSignature)
159 }
160}
161
162pub fn next_turn_owner<'a>(
172 participants: &'a [Participant],
173 current: Option<&str>,
174) -> Option<&'a str> {
175 let mut accepted: Vec<&Participant> = participants
176 .iter()
177 .filter(|p| p.accepted_at.is_some())
178 .collect();
179 accepted.sort_by_key(|p| p.invited_at);
180 if accepted.is_empty() {
181 return None;
182 }
183 let pks: Vec<&str> = accepted.iter().map(|p| p.agent_pubkey.as_str()).collect();
184 let idx = match current.and_then(|c| pks.iter().position(|p| *p == c)) {
185 Some(i) => (i + 1) % pks.len(),
186 None => 0,
187 };
188 Some(pks[idx])
189}
190
191pub fn dedup_invites(creator_pubkey: &str, invite_pubkeys: &[String]) -> Vec<String> {
199 let mut seen: HashSet<&str> = HashSet::new();
200 seen.insert(creator_pubkey);
201 let mut out = Vec::with_capacity(invite_pubkeys.len());
202 for pk in invite_pubkeys {
203 if seen.insert(pk.as_str()) {
204 out.push(pk.clone());
205 }
206 }
207 out
208}
209
210#[allow(clippy::too_many_arguments)]
218pub fn check_post_message(
219 body: &str,
220 room_status: &str,
221 room_ttl_until: &DateTime<Utc>,
222 caller_is_accepted_participant: bool,
223 room_turn_owner: Option<&str>,
224 caller_pubkey: &str,
225 expected_next_turn: i64,
226 got_turn: i64,
227 created_at: &DateTime<Utc>,
228 now: &DateTime<Utc>,
229) -> Result<(), Error> {
230 if body.len() > MAX_BODY_BYTES {
231 return Err(Error::BodyTooLarge);
232 }
233 if room_status == "closed" || now >= room_ttl_until {
234 return Err(Error::RoomClosed);
235 }
236 if !caller_is_accepted_participant {
237 return Err(Error::NotAParticipant);
238 }
239 match room_turn_owner {
240 Some(owner) if owner == caller_pubkey => {}
241 _ => return Err(Error::NotTurnOwner),
242 }
243 if got_turn != expected_next_turn {
244 return Err(Error::TurnConflict {
245 expected: expected_next_turn,
246 got: got_turn,
247 });
248 }
249 if !is_timestamp_fresh(created_at, now) {
250 return Err(Error::StaleTimestamp);
251 }
252 Ok(())
253}
254
255pub struct DedupStore {
265 inner: Mutex<HashMap<String, SystemTime>>,
266 window: Duration,
267}
268
269impl DedupStore {
270 pub fn new(window_secs: u64) -> Self {
272 Self {
273 inner: Mutex::new(HashMap::new()),
274 window: Duration::from_secs(window_secs),
275 }
276 }
277
278 pub fn check_and_mark(&self, canonical_bytes: &[u8]) -> Result<(), Error> {
282 let now = SystemTime::now();
283 let mut hasher = Sha256::new();
284 hasher.update(canonical_bytes);
285 let h = hex::encode(hasher.finalize());
286
287 let mut map = self.inner.lock().expect("dedup store poisoned");
288 map.retain(|_, exp| *exp >= now);
290 if map.contains_key(&h) {
291 return Err(Error::ReplayDetected);
292 }
293 map.insert(h, now + self.window);
294 Ok(())
295 }
296
297 pub fn reset(&self) {
299 self.inner.lock().expect("dedup store poisoned").clear();
300 }
301}
302
303impl Default for DedupStore {
304 fn default() -> Self {
305 Self::new(CLOCK_SKEW_SECS as u64)
306 }
307}
308
309pub static DEFAULT_DEDUP: Lazy<DedupStore> = Lazy::new(DedupStore::default);
313
314pub fn parse_header_pubkey(value: &str) -> Result<[u8; PUBKEY_LEN], Error> {
321 if value.len() != PUBKEY_LEN * 2
322 || !value
323 .chars()
324 .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())
325 {
326 return Err(Error::InvalidPubkey);
327 }
328 crate::keys::pubkey_from_hex(value)
329}
330
331#[cfg(test)]
332mod tests {
333 use super::*;
334 use chrono::TimeZone;
335
336 fn p(agent: &str, invited_secs: i64, accepted: bool) -> Participant {
337 Participant {
338 room_id: "r".into(),
339 agent_pubkey: agent.into(),
340 owner_pubkey: agent.into(),
341 invited_by_pubkey: "creator".into(),
342 invited_at: Utc.timestamp_opt(invited_secs, 0).unwrap(),
343 accepted_at: if accepted {
344 Some(Utc.timestamp_opt(invited_secs + 1, 0).unwrap())
345 } else {
346 None
347 },
348 accept_sig: None,
349 }
350 }
351
352 #[test]
353 fn rotation_round_robin() {
354 let ps = vec![p("a", 0, true), p("b", 1, true), p("c", 2, true)];
355 assert_eq!(next_turn_owner(&ps, Some("a")), Some("b"));
356 assert_eq!(next_turn_owner(&ps, Some("b")), Some("c"));
357 assert_eq!(next_turn_owner(&ps, Some("c")), Some("a"));
358 }
359
360 #[test]
361 fn rotation_skips_pending() {
362 let ps = vec![p("a", 0, true), p("b", 1, false), p("c", 2, true)];
363 assert_eq!(next_turn_owner(&ps, Some("a")), Some("c"));
364 assert_eq!(next_turn_owner(&ps, Some("c")), Some("a"));
365 }
366
367 #[test]
368 fn dedup_drops_creator() {
369 let out = dedup_invites(
370 "creator",
371 &["creator".into(), "alice".into(), "alice".into()],
372 );
373 assert_eq!(out, vec!["alice".to_string()]);
374 }
375
376 #[test]
377 fn freshness_window() {
378 let now = Utc.timestamp_opt(1_000_000, 0).unwrap();
379 let inside = Utc.timestamp_opt(1_000_000 + 30, 0).unwrap();
380 let outside = Utc.timestamp_opt(1_000_000 + 90, 0).unwrap();
381 assert!(is_timestamp_fresh(&inside, &now));
382 assert!(!is_timestamp_fresh(&outside, &now));
383 }
384
385 #[test]
386 fn dedup_replay() {
387 let store = DedupStore::new(60);
388 let payload = b"{\"hello\":\"world\"}";
389 assert!(store.check_and_mark(payload).is_ok());
390 assert_eq!(store.check_and_mark(payload), Err(Error::ReplayDetected));
391 store.reset();
392 assert!(store.check_and_mark(payload).is_ok());
393 }
394
395 #[test]
396 fn header_pubkey_rejects_uppercase() {
397 let lower = "8a88e3dd7409f195fd52db2d3cba5d72ca6709bf1d94121bf3748801b40f6f5c";
399 assert!(parse_header_pubkey(lower).is_ok());
400 let upper = "8A88E3DD7409F195FD52DB2D3CBA5D72CA6709BF1D94121BF3748801B40F6F5C";
401 assert_eq!(parse_header_pubkey(upper), Err(Error::InvalidPubkey));
402 }
403}