Skip to main content

agent_rooms/
protocol.rs

1//! Pure protocol logic — signed-payload builders, freshness check, turn
2//! rotation, replay dedup. Mirrors the non-DB parts of
3//! `parley/services/{rooms,messages,participants,dedup}.py`.
4
5use std::collections::{HashMap, HashSet};
6use std::sync::Mutex;
7use std::time::{Duration, SystemTime};
8
9use chrono::{DateTime, Utc};
10use once_cell::sync::Lazy;
11use serde_json::{json, Map, Value};
12use sha2::{Digest, Sha256};
13
14use crate::canonical::canonical_json;
15use crate::error::Error;
16use crate::keys::{verify, PUBKEY_LEN};
17use crate::models::Participant;
18
19/// SPEC §6.6: `created_at` must be within ±60 s of server `now`.
20pub const CLOCK_SKEW_SECS: i64 = 60;
21
22/// SPEC C9: message body max bytes.
23pub const MAX_BODY_BYTES: usize = 16 * 1024;
24
25/// SPEC C5: room topic minimum character count.
26pub const TOPIC_MIN_CHARS: usize = 1;
27/// SPEC C5: room topic maximum character count.
28pub const TOPIC_MAX_CHARS: usize = 256;
29
30/// SPEC C6: minimum allowed `max_turns`.
31pub const MAX_TURNS_MIN: i64 = 1;
32/// SPEC C6: maximum allowed `max_turns`.
33pub const MAX_TURNS_MAX: i64 = 1000;
34
35/// Minimum allowed `ttl_hours` on room creation.
36pub const TTL_HOURS_MIN: i64 = 1;
37/// Maximum allowed `ttl_hours` on room creation.
38pub const TTL_HOURS_MAX: i64 = 720;
39
40// ---------------------------------------------------------------------------
41// Signed-payload builders — Appendix A
42// ---------------------------------------------------------------------------
43
44/// Build the canonical signed bytes for `POST /v1/rooms` (C11).
45/// Sorted keys: `created_at, invite_pubkeys, max_turns, topic, ttl_hours`.
46pub fn create_room_payload(
47    topic: &str,
48    invite_pubkeys: &[String],
49    max_turns: i64,
50    ttl_hours: i64,
51    created_at: &DateTime<Utc>,
52) -> Vec<u8> {
53    let v = json!({
54        "topic": topic,
55        "invite_pubkeys": invite_pubkeys,
56        "max_turns": max_turns,
57        "ttl_hours": ttl_hours,
58        "created_at": iso8601_python(created_at),
59    });
60    canonical_json(&v)
61}
62
63/// Build the canonical signed bytes for `POST /v1/rooms/{id}/accept` (C13).
64/// Sorted keys: `agent_pubkey, created_at, room_id`.
65pub fn accept_payload(room_id: &str, agent_pubkey: &str, created_at: &DateTime<Utc>) -> Vec<u8> {
66    let v = json!({
67        "room_id": room_id,
68        "agent_pubkey": agent_pubkey,
69        "created_at": iso8601_python(created_at),
70    });
71    canonical_json(&v)
72}
73
74/// Build the canonical signed bytes for `POST /v1/rooms/{id}/close` (C14).
75/// Sorted keys: `created_at, room_id, summary`. `summary` of `None` is
76/// emitted as literal JSON `null`.
77pub fn close_payload(room_id: &str, summary: Option<&str>, created_at: &DateTime<Utc>) -> Vec<u8> {
78    let mut obj = Map::new();
79    obj.insert("room_id".to_string(), Value::String(room_id.to_string()));
80    obj.insert(
81        "summary".to_string(),
82        match summary {
83            Some(s) => Value::String(s.to_string()),
84            None => Value::Null,
85        },
86    );
87    obj.insert(
88        "created_at".to_string(),
89        Value::String(iso8601_python(created_at)),
90    );
91    canonical_json(&Value::Object(obj))
92}
93
94/// Build the canonical signed bytes for `POST /v1/rooms/{id}/messages` (C16).
95/// Sorted keys: `author_pubkey, body, created_at, room_id, turn_n`.
96pub fn post_message_payload(
97    room_id: &str,
98    author_pubkey: &str,
99    turn_n: i64,
100    body: &str,
101    created_at: &DateTime<Utc>,
102) -> Vec<u8> {
103    let v = json!({
104        "room_id": room_id,
105        "turn_n": turn_n,
106        "author_pubkey": author_pubkey,
107        "body": body,
108        "created_at": iso8601_python(created_at),
109    });
110    canonical_json(&v)
111}
112
113/// Format a UTC timestamp the way Python's `datetime.isoformat()` does for
114/// tz-aware UTC: `YYYY-MM-DDThh:mm:ss[.ffffff]+00:00`. Microsecond precision
115/// is preserved when non-zero. The `+00:00` suffix (not `Z`) is required by
116/// SPEC §4 clause 7.
117pub fn iso8601_python(dt: &DateTime<Utc>) -> String {
118    let micros = dt.timestamp_subsec_micros();
119    if micros == 0 {
120        dt.format("%Y-%m-%dT%H:%M:%S+00:00").to_string()
121    } else {
122        format!("{}.{:06}+00:00", dt.format("%Y-%m-%dT%H:%M:%S"), micros)
123    }
124}
125
126// ---------------------------------------------------------------------------
127// Freshness — SPEC §6.6 / C20
128// ---------------------------------------------------------------------------
129
130/// Returns true iff `ts` is within ±`CLOCK_SKEW_SECS` of `now`.
131pub fn is_timestamp_fresh(ts: &DateTime<Utc>, now: &DateTime<Utc>) -> bool {
132    (ts.timestamp() - now.timestamp()).abs() <= CLOCK_SKEW_SECS
133}
134
135/// Same, against the system clock.
136pub fn check_freshness(ts: &DateTime<Utc>) -> Result<(), Error> {
137    if is_timestamp_fresh(ts, &Utc::now()) {
138        Ok(())
139    } else {
140        Err(Error::StaleTimestamp)
141    }
142}
143
144// ---------------------------------------------------------------------------
145// Signature verification
146// ---------------------------------------------------------------------------
147
148/// Verify an Ed25519 signature against the canonical bytes of a signed
149/// payload. Returns `Err(BadSignature)` on any failure.
150pub fn verify_signed(
151    pubkey: &[u8; PUBKEY_LEN],
152    canonical_bytes: &[u8],
153    signature: &[u8],
154) -> Result<(), Error> {
155    if verify(pubkey, canonical_bytes, signature) {
156        Ok(())
157    } else {
158        Err(Error::BadSignature)
159    }
160}
161
162// ---------------------------------------------------------------------------
163// Turn rotation — SPEC §8.2 / C24
164// ---------------------------------------------------------------------------
165
166/// Round-robin over **accepted** participants ordered by `invited_at`.
167///
168/// - If no participant is accepted, returns `None`.
169/// - If `current` is not in the accepted set, returns the first accepted.
170/// - Otherwise returns `A[(i+1) mod |A|]`.
171pub fn next_turn_owner<'a>(
172    participants: &'a [Participant],
173    current: Option<&str>,
174) -> Option<&'a str> {
175    let mut accepted: Vec<&Participant> = participants
176        .iter()
177        .filter(|p| p.accepted_at.is_some())
178        .collect();
179    accepted.sort_by_key(|p| p.invited_at);
180    if accepted.is_empty() {
181        return None;
182    }
183    let pks: Vec<&str> = accepted.iter().map(|p| p.agent_pubkey.as_str()).collect();
184    let idx = match current.and_then(|c| pks.iter().position(|p| *p == c)) {
185        Some(i) => (i + 1) % pks.len(),
186        None => 0,
187    };
188    Some(pks[idx])
189}
190
191// ---------------------------------------------------------------------------
192// Invite-list deduplication — SPEC §6.1
193// ---------------------------------------------------------------------------
194
195/// Drop duplicates from `invite_pubkeys`, preserving first-occurrence order,
196/// and drop any entry equal to the creator. Mirrors the behaviour of
197/// `parley/services/rooms.py:create_room`.
198pub fn dedup_invites(creator_pubkey: &str, invite_pubkeys: &[String]) -> Vec<String> {
199    let mut seen: HashSet<&str> = HashSet::new();
200    seen.insert(creator_pubkey);
201    let mut out = Vec::with_capacity(invite_pubkeys.len());
202    for pk in invite_pubkeys {
203        if seen.insert(pk.as_str()) {
204            out.push(pk.clone());
205        }
206    }
207    out
208}
209
210// ---------------------------------------------------------------------------
211// Post-message preconditions — SPEC §6.6
212// ---------------------------------------------------------------------------
213
214/// Returns Ok iff every precondition for a `post_message` write passes.
215/// Order matches SPEC §6.6. Caller passes the relevant room/participant
216/// state extracted from their own store.
217#[allow(clippy::too_many_arguments)]
218pub fn check_post_message(
219    body: &str,
220    room_status: &str,
221    room_ttl_until: &DateTime<Utc>,
222    caller_is_accepted_participant: bool,
223    room_turn_owner: Option<&str>,
224    caller_pubkey: &str,
225    expected_next_turn: i64,
226    got_turn: i64,
227    created_at: &DateTime<Utc>,
228    now: &DateTime<Utc>,
229) -> Result<(), Error> {
230    if body.len() > MAX_BODY_BYTES {
231        return Err(Error::BodyTooLarge);
232    }
233    if room_status == "closed" || now >= room_ttl_until {
234        return Err(Error::RoomClosed);
235    }
236    if !caller_is_accepted_participant {
237        return Err(Error::NotAParticipant);
238    }
239    match room_turn_owner {
240        Some(owner) if owner == caller_pubkey => {}
241        _ => return Err(Error::NotTurnOwner),
242    }
243    if got_turn != expected_next_turn {
244        return Err(Error::TurnConflict {
245            expected: expected_next_turn,
246            got: got_turn,
247        });
248    }
249    if !is_timestamp_fresh(created_at, now) {
250        return Err(Error::StaleTimestamp);
251    }
252    Ok(())
253}
254
255// ---------------------------------------------------------------------------
256// Replay dedup — SPEC §10.1 / C28
257// ---------------------------------------------------------------------------
258
259/// In-memory replay-detection store. Single-process only; SPEC §10.2 notes
260/// that a multi-worker deployment needs a shared backing store.
261///
262/// `check_and_mark` returns `Ok(())` if the bytes are new in the window, and
263/// `Err(ReplayDetected)` if they've been seen.
264pub struct DedupStore {
265    inner: Mutex<HashMap<String, SystemTime>>,
266    window: Duration,
267}
268
269impl DedupStore {
270    /// Construct a new store with the given replay-window duration in seconds.
271    pub fn new(window_secs: u64) -> Self {
272        Self {
273            inner: Mutex::new(HashMap::new()),
274            window: Duration::from_secs(window_secs),
275        }
276    }
277
278    /// Returns `Ok(())` if `canonical_bytes` is new in the window; stores it.
279    /// Returns `Err(ReplayDetected)` if the same bytes were already accepted
280    /// within `window_secs`. Garbage-collects expired entries on every call.
281    pub fn check_and_mark(&self, canonical_bytes: &[u8]) -> Result<(), Error> {
282        let now = SystemTime::now();
283        let mut hasher = Sha256::new();
284        hasher.update(canonical_bytes);
285        let h = hex::encode(hasher.finalize());
286
287        let mut map = self.inner.lock().expect("dedup store poisoned");
288        // GC expired entries.
289        map.retain(|_, exp| *exp >= now);
290        if map.contains_key(&h) {
291            return Err(Error::ReplayDetected);
292        }
293        map.insert(h, now + self.window);
294        Ok(())
295    }
296
297    /// Test helper — clears the store.
298    pub fn reset(&self) {
299        self.inner.lock().expect("dedup store poisoned").clear();
300    }
301}
302
303impl Default for DedupStore {
304    fn default() -> Self {
305        Self::new(CLOCK_SKEW_SECS as u64)
306    }
307}
308
309/// Shared default dedup store, mirroring the module-level `_store` in
310/// `parley/services/dedup.py`. Most embedders will prefer an owned
311/// `DedupStore` instance — this exists for parity with the reference impl.
312pub static DEFAULT_DEDUP: Lazy<DedupStore> = Lazy::new(DedupStore::default);
313
314// ---------------------------------------------------------------------------
315// Helper: parse the wire header pubkey
316// ---------------------------------------------------------------------------
317
318/// Validates `X-Agent-Pubkey` header value (SPEC §3 / C3). Bare lowercase
319/// hex, exactly 64 chars. Anything else → `InvalidPubkey`.
320pub fn parse_header_pubkey(value: &str) -> Result<[u8; PUBKEY_LEN], Error> {
321    if value.len() != PUBKEY_LEN * 2
322        || !value
323            .chars()
324            .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())
325    {
326        return Err(Error::InvalidPubkey);
327    }
328    crate::keys::pubkey_from_hex(value)
329}
330
331#[cfg(test)]
332mod tests {
333    use super::*;
334    use chrono::TimeZone;
335
336    fn p(agent: &str, invited_secs: i64, accepted: bool) -> Participant {
337        Participant {
338            room_id: "r".into(),
339            agent_pubkey: agent.into(),
340            owner_pubkey: agent.into(),
341            invited_by_pubkey: "creator".into(),
342            invited_at: Utc.timestamp_opt(invited_secs, 0).unwrap(),
343            accepted_at: if accepted {
344                Some(Utc.timestamp_opt(invited_secs + 1, 0).unwrap())
345            } else {
346                None
347            },
348            accept_sig: None,
349        }
350    }
351
352    #[test]
353    fn rotation_round_robin() {
354        let ps = vec![p("a", 0, true), p("b", 1, true), p("c", 2, true)];
355        assert_eq!(next_turn_owner(&ps, Some("a")), Some("b"));
356        assert_eq!(next_turn_owner(&ps, Some("b")), Some("c"));
357        assert_eq!(next_turn_owner(&ps, Some("c")), Some("a"));
358    }
359
360    #[test]
361    fn rotation_skips_pending() {
362        let ps = vec![p("a", 0, true), p("b", 1, false), p("c", 2, true)];
363        assert_eq!(next_turn_owner(&ps, Some("a")), Some("c"));
364        assert_eq!(next_turn_owner(&ps, Some("c")), Some("a"));
365    }
366
367    #[test]
368    fn dedup_drops_creator() {
369        let out = dedup_invites(
370            "creator",
371            &["creator".into(), "alice".into(), "alice".into()],
372        );
373        assert_eq!(out, vec!["alice".to_string()]);
374    }
375
376    #[test]
377    fn freshness_window() {
378        let now = Utc.timestamp_opt(1_000_000, 0).unwrap();
379        let inside = Utc.timestamp_opt(1_000_000 + 30, 0).unwrap();
380        let outside = Utc.timestamp_opt(1_000_000 + 90, 0).unwrap();
381        assert!(is_timestamp_fresh(&inside, &now));
382        assert!(!is_timestamp_fresh(&outside, &now));
383    }
384
385    #[test]
386    fn dedup_replay() {
387        let store = DedupStore::new(60);
388        let payload = b"{\"hello\":\"world\"}";
389        assert!(store.check_and_mark(payload).is_ok());
390        assert_eq!(store.check_and_mark(payload), Err(Error::ReplayDetected));
391        store.reset();
392        assert!(store.check_and_mark(payload).is_ok());
393    }
394
395    #[test]
396    fn header_pubkey_rejects_uppercase() {
397        // SPEC C1: bare lowercase hex.
398        let lower = "8a88e3dd7409f195fd52db2d3cba5d72ca6709bf1d94121bf3748801b40f6f5c";
399        assert!(parse_header_pubkey(lower).is_ok());
400        let upper = "8A88E3DD7409F195FD52DB2D3CBA5D72CA6709BF1D94121BF3748801B40F6F5C";
401        assert_eq!(parse_header_pubkey(upper), Err(Error::InvalidPubkey));
402    }
403}