Skip to main content

awaken_server_contract/contract/
protocol_replay_log.rs

1//! Protocol wire replay log contracts.
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use thiserror::Error;
7
8use super::event_store::{CanonicalEventId, EventCursor, EventScope};
9use crate::contract::scope::{ScopeId, scoped_key, unscoped_key};
10use std::sync::Arc;
11
12/// Errors returned by protocol replay log implementations.
13#[derive(Debug, Error, Clone, PartialEq, Eq)]
14pub enum ProtocolReplayError {
15    /// The provided input violates the replay-log contract.
16    #[error("validation error: {0}")]
17    Validation(String),
18    /// The wire event already exists for the protocol stream with different data.
19    #[error("conflict: {0}")]
20    Conflict(String),
21    /// The requested cursor is outside retained replay history.
22    #[error("cursor expired: {0}")]
23    CursorExpired(String),
24    /// Replay history is missing a row that should still be retained.
25    #[error("integrity error: {0}")]
26    Integrity(String),
27    /// An I/O error occurred.
28    #[error("io error: {0}")]
29    Io(String),
30    /// A serialization or deserialization error occurred.
31    #[error("serialization error: {0}")]
32    Serialization(String),
33}
34
35/// Stable protocol replay row identifier assigned by a [`ProtocolReplayWriter`].
36#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
37#[serde(transparent)]
38pub struct ProtocolReplayId(String);
39
40impl ProtocolReplayId {
41    pub fn new(value: impl Into<String>) -> Result<Self, ProtocolReplayError> {
42        let value = value.into();
43        reject_blank("protocol_replay_id", &value)?;
44        Ok(Self(value))
45    }
46
47    #[must_use]
48    pub fn as_str(&self) -> &str {
49        &self.0
50    }
51}
52
53/// Opaque cursor for a single protocol replay stream.
54#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
55#[serde(transparent)]
56pub struct ProtocolReplayCursor(String);
57
58impl ProtocolReplayCursor {
59    pub fn new(value: impl Into<String>) -> Result<Self, ProtocolReplayError> {
60        let value = value.into();
61        reject_blank("protocol_replay_cursor", &value)?;
62        Ok(Self(value))
63    }
64
65    #[must_use]
66    pub fn as_str(&self) -> &str {
67        &self.0
68    }
69}
70
71/// Redaction state of a replay row.
72#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
73#[serde(rename_all = "snake_case")]
74pub enum ProtocolReplayRedactionState {
75    /// The payload is replayable as stored.
76    #[default]
77    Clear,
78    /// The original payload was replaced by a redacted/tombstone payload.
79    Redacted,
80}
81
82/// Canonical event cursor referenced by a protocol replay row.
83#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
84pub struct SourceEventCursor {
85    pub event_id: CanonicalEventId,
86    pub scope: EventScope,
87    pub cursor: EventCursor,
88}
89
90impl SourceEventCursor {
91    #[must_use]
92    pub fn new(event_id: CanonicalEventId, scope: EventScope, cursor: EventCursor) -> Self {
93        Self {
94            event_id,
95            scope,
96            cursor,
97        }
98    }
99}
100
101/// ProtocolReplayLog append input. Store-assigned fields are intentionally absent.
102#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
103pub struct ProtocolReplayDraft {
104    pub stream_id: String,
105    pub protocol: String,
106    pub protocol_version: String,
107    pub projector_version: String,
108    pub wire_event_id: String,
109    pub wire_event_type: String,
110    pub wire_payload_bytes: Vec<u8>,
111    #[serde(default, skip_serializing_if = "Option::is_none")]
112    pub wire_payload_json: Option<Value>,
113    #[serde(default, skip_serializing_if = "Vec::is_empty")]
114    pub source_event_ids: Vec<CanonicalEventId>,
115    #[serde(default, skip_serializing_if = "Vec::is_empty")]
116    pub source_event_cursors: Vec<SourceEventCursor>,
117    #[serde(default)]
118    pub redaction_state: ProtocolReplayRedactionState,
119    #[serde(default, skip_serializing_if = "Option::is_none")]
120    pub expires_at: Option<u64>,
121}
122
123impl ProtocolReplayDraft {
124    /// Create and validate a replay draft.
125    pub fn new(
126        stream_id: impl Into<String>,
127        protocol: impl Into<String>,
128        protocol_version: impl Into<String>,
129        projector_version: impl Into<String>,
130        wire_event_id: impl Into<String>,
131        wire_event_type: impl Into<String>,
132        wire_payload_bytes: Vec<u8>,
133    ) -> Result<Self, ProtocolReplayError> {
134        let draft = Self {
135            stream_id: stream_id.into(),
136            protocol: protocol.into(),
137            protocol_version: protocol_version.into(),
138            projector_version: projector_version.into(),
139            wire_event_id: wire_event_id.into(),
140            wire_event_type: wire_event_type.into(),
141            wire_payload_bytes,
142            wire_payload_json: None,
143            source_event_ids: Vec::new(),
144            source_event_cursors: Vec::new(),
145            redaction_state: ProtocolReplayRedactionState::default(),
146            expires_at: None,
147        };
148        draft.validate()?;
149        Ok(draft)
150    }
151
152    /// Validate replay-stream identity and payload presence.
153    pub fn validate(&self) -> Result<(), ProtocolReplayError> {
154        reject_blank("stream_id", &self.stream_id)?;
155        reject_blank("protocol", &self.protocol)?;
156        reject_blank("protocol_version", &self.protocol_version)?;
157        reject_blank("projector_version", &self.projector_version)?;
158        reject_blank("wire_event_id", &self.wire_event_id)?;
159        reject_blank("wire_event_type", &self.wire_event_type)?;
160        if self.wire_payload_bytes.is_empty() {
161            return Err(ProtocolReplayError::Validation(
162                "wire_payload_bytes is required".to_string(),
163            ));
164        }
165        Ok(())
166    }
167}
168
169/// Persisted protocol replay row.
170#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
171pub struct ProtocolReplayRecord {
172    pub protocol_replay_id: ProtocolReplayId,
173    pub stream_id: String,
174    pub protocol: String,
175    pub protocol_version: String,
176    pub projector_version: String,
177    pub wire_event_id: String,
178    pub wire_event_type: String,
179    pub wire_payload_bytes: Vec<u8>,
180    #[serde(default, skip_serializing_if = "Option::is_none")]
181    pub wire_payload_json: Option<Value>,
182    #[serde(default, skip_serializing_if = "Vec::is_empty")]
183    pub source_event_ids: Vec<CanonicalEventId>,
184    #[serde(default, skip_serializing_if = "Vec::is_empty")]
185    pub source_event_cursors: Vec<SourceEventCursor>,
186    pub protocol_replay_cursor: ProtocolReplayCursor,
187    #[serde(default)]
188    pub redaction_state: ProtocolReplayRedactionState,
189    #[serde(default, skip_serializing_if = "Option::is_none")]
190    pub expires_at: Option<u64>,
191    pub created_at: u64,
192}
193
194impl ProtocolReplayRecord {
195    /// Build a persisted replay row from an accepted draft and store output.
196    pub fn from_append(
197        protocol_replay_id: ProtocolReplayId,
198        protocol_replay_cursor: ProtocolReplayCursor,
199        created_at: u64,
200        draft: ProtocolReplayDraft,
201    ) -> Result<Self, ProtocolReplayError> {
202        draft.validate()?;
203        Ok(Self {
204            protocol_replay_id,
205            stream_id: draft.stream_id,
206            protocol: draft.protocol,
207            protocol_version: draft.protocol_version,
208            projector_version: draft.projector_version,
209            wire_event_id: draft.wire_event_id,
210            wire_event_type: draft.wire_event_type,
211            wire_payload_bytes: draft.wire_payload_bytes,
212            wire_payload_json: draft.wire_payload_json,
213            source_event_ids: draft.source_event_ids,
214            source_event_cursors: draft.source_event_cursors,
215            protocol_replay_cursor,
216            redaction_state: draft.redaction_state,
217            expires_at: draft.expires_at,
218            created_at,
219        })
220    }
221}
222
223/// Protocol stream selection.
224#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
225pub struct ProtocolStreamKey {
226    pub stream_id: String,
227    pub protocol: String,
228    pub protocol_version: String,
229}
230
231impl ProtocolStreamKey {
232    pub fn new(
233        stream_id: impl Into<String>,
234        protocol: impl Into<String>,
235        protocol_version: impl Into<String>,
236    ) -> Result<Self, ProtocolReplayError> {
237        let key = Self {
238            stream_id: stream_id.into(),
239            protocol: protocol.into(),
240            protocol_version: protocol_version.into(),
241        };
242        key.validate()?;
243        Ok(key)
244    }
245
246    pub fn validate(&self) -> Result<(), ProtocolReplayError> {
247        reject_blank("stream_id", &self.stream_id)?;
248        reject_blank("protocol", &self.protocol)?;
249        reject_blank("protocol_version", &self.protocol_version)
250    }
251}
252
253/// Result returned by an append call.
254#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
255pub struct ProtocolReplayAppendResult {
256    pub record: ProtocolReplayRecord,
257}
258
259impl ProtocolReplayAppendResult {
260    #[must_use]
261    pub fn protocol_replay_cursor(&self) -> &ProtocolReplayCursor {
262        &self.record.protocol_replay_cursor
263    }
264}
265
266/// Paged protocol replay response.
267#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
268pub struct ProtocolReplayPage {
269    pub records: Vec<ProtocolReplayRecord>,
270    #[serde(default, skip_serializing_if = "Option::is_none")]
271    pub next_cursor: Option<ProtocolReplayCursor>,
272    pub has_more: bool,
273}
274
275/// Append protocol replay rows.
276#[async_trait]
277pub trait ProtocolReplayWriter: Send + Sync {
278    async fn append_replay(
279        &self,
280        draft: ProtocolReplayDraft,
281    ) -> Result<ProtocolReplayAppendResult, ProtocolReplayError>;
282}
283
284/// Read protocol replay history.
285#[async_trait]
286pub trait ProtocolReplayReader: Send + Sync {
287    async fn list_replay(
288        &self,
289        stream: ProtocolStreamKey,
290        from: Option<ProtocolReplayCursor>,
291        limit: usize,
292    ) -> Result<ProtocolReplayPage, ProtocolReplayError>;
293}
294
295/// Load a persisted protocol replay row by durable row identity.
296#[async_trait]
297pub trait ProtocolReplayLookup: Send + Sync {
298    async fn load_replay(
299        &self,
300        protocol_replay_id: &ProtocolReplayId,
301    ) -> Result<Option<ProtocolReplayRecord>, ProtocolReplayError>;
302}
303
304/// Full protocol replay-log capability.
305pub trait ProtocolReplayLog:
306    ProtocolReplayWriter + ProtocolReplayReader + ProtocolReplayLookup
307{
308}
309
310impl<T> ProtocolReplayLog for T where
311    T: ProtocolReplayWriter + ProtocolReplayReader + ProtocolReplayLookup
312{
313}
314
315fn reject_blank(field: &str, value: &str) -> Result<(), ProtocolReplayError> {
316    if value.trim().is_empty() {
317        return Err(ProtocolReplayError::Validation(format!(
318            "{field} is required"
319        )));
320    }
321    Ok(())
322}
323
324#[derive(Clone)]
325pub struct ScopedProtocolReplayLog {
326    inner: Arc<dyn ProtocolReplayLog>,
327    scope_id: ScopeId,
328}
329
330impl ScopedProtocolReplayLog {
331    pub fn new(inner: Arc<dyn ProtocolReplayLog>, scope_id: ScopeId) -> Self {
332        Self { inner, scope_id }
333    }
334
335    pub fn scope_id(&self) -> &ScopeId {
336        &self.scope_id
337    }
338
339    pub fn inner(&self) -> &dyn ProtocolReplayLog {
340        self.inner.as_ref()
341    }
342
343    fn scoped(&self, value: &str) -> String {
344        scoped_key(&self.scope_id, value)
345    }
346
347    fn unscoped<'a>(&self, value: &'a str) -> Option<&'a str> {
348        unscoped_key(&self.scope_id, value)
349    }
350
351    fn encode_draft(&self, mut draft: ProtocolReplayDraft) -> ProtocolReplayDraft {
352        draft.stream_id = self.scoped(&draft.stream_id);
353        draft
354    }
355
356    fn encode_stream(&self, mut stream: ProtocolStreamKey) -> ProtocolStreamKey {
357        stream.stream_id = self.scoped(&stream.stream_id);
358        stream
359    }
360
361    fn decode_record(&self, mut record: ProtocolReplayRecord) -> Option<ProtocolReplayRecord> {
362        record.stream_id = self.unscoped(&record.stream_id)?.to_string();
363        Some(record)
364    }
365}
366
367#[async_trait]
368impl ProtocolReplayWriter for ScopedProtocolReplayLog {
369    async fn append_replay(
370        &self,
371        draft: ProtocolReplayDraft,
372    ) -> Result<ProtocolReplayAppendResult, ProtocolReplayError> {
373        let result = self.inner.append_replay(self.encode_draft(draft)).await?;
374        let record = self.decode_record(result.record).ok_or_else(|| {
375            ProtocolReplayError::Integrity(
376                "scoped replay log returned a record outside its scope".into(),
377            )
378        })?;
379        Ok(ProtocolReplayAppendResult { record })
380    }
381}
382
383#[async_trait]
384impl ProtocolReplayReader for ScopedProtocolReplayLog {
385    async fn list_replay(
386        &self,
387        stream: ProtocolStreamKey,
388        from: Option<ProtocolReplayCursor>,
389        limit: usize,
390    ) -> Result<ProtocolReplayPage, ProtocolReplayError> {
391        let mut page = self
392            .inner
393            .list_replay(self.encode_stream(stream), from, limit)
394            .await?;
395        page.records = page
396            .records
397            .into_iter()
398            .filter_map(|record| self.decode_record(record))
399            .collect();
400        Ok(page)
401    }
402}
403
404#[async_trait]
405impl ProtocolReplayLookup for ScopedProtocolReplayLog {
406    async fn load_replay(
407        &self,
408        protocol_replay_id: &ProtocolReplayId,
409    ) -> Result<Option<ProtocolReplayRecord>, ProtocolReplayError> {
410        Ok(self
411            .inner
412            .load_replay(protocol_replay_id)
413            .await?
414            .and_then(|record| self.decode_record(record)))
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421
422    fn replay_draft() -> ProtocolReplayDraft {
423        ProtocolReplayDraft::new(
424            "thread:t1",
425            "ai-sdk",
426            "v6",
427            "projector-1",
428            "evt_wire_1",
429            "agent.message",
430            br#"{"type":"agent.message"}"#.to_vec(),
431        )
432        .unwrap()
433    }
434
435    #[test]
436    fn draft_requires_wire_payload_bytes() {
437        let err = ProtocolReplayDraft::new(
438            "thread:t1",
439            "ai-sdk",
440            "v6",
441            "projector-1",
442            "evt_wire_1",
443            "agent.message",
444            Vec::new(),
445        )
446        .unwrap_err();
447        assert!(
448            matches!(err, ProtocolReplayError::Validation(message) if message.contains("wire_payload"))
449        );
450    }
451
452    #[test]
453    fn stream_key_rejects_blank_protocol() {
454        let err = ProtocolStreamKey::new("thread:t1", " ", "v6").unwrap_err();
455        assert!(
456            matches!(err, ProtocolReplayError::Validation(message) if message.contains("protocol"))
457        );
458    }
459
460    #[test]
461    fn persisted_record_preserves_byte_payload_and_cursor() {
462        let mut draft = replay_draft();
463        draft.wire_payload_json = Some(serde_json::json!({"type":"agent.message"}));
464        let record = ProtocolReplayRecord::from_append(
465            ProtocolReplayId::new("pr_1").unwrap(),
466            ProtocolReplayCursor::new("prcur_1").unwrap(),
467            42,
468            draft,
469        )
470        .unwrap();
471
472        assert_eq!(record.protocol_replay_id.as_str(), "pr_1");
473        assert_eq!(record.protocol_replay_cursor.as_str(), "prcur_1");
474        assert_eq!(record.wire_payload_bytes, br#"{"type":"agent.message"}"#);
475        assert_eq!(record.wire_payload_json.unwrap()["type"], "agent.message");
476    }
477
478    #[test]
479    fn opaque_replay_cursor_roundtrips() {
480        let cursor = ProtocolReplayCursor::new("wirecur_opaque").unwrap();
481        let encoded = serde_json::to_string(&cursor).unwrap();
482        assert_eq!(encoded, "\"wirecur_opaque\"");
483        let decoded: ProtocolReplayCursor = serde_json::from_str(&encoded).unwrap();
484        assert_eq!(decoded.as_str(), "wirecur_opaque");
485    }
486
487    // ── scope isolation regression (the scoped lookup must not leak across
488    //    scopes even when the durable id is known) ──
489
490    #[derive(Default)]
491    struct InMemReplay {
492        rows: std::sync::Mutex<Vec<ProtocolReplayRecord>>,
493    }
494
495    #[async_trait]
496    impl ProtocolReplayWriter for InMemReplay {
497        async fn append_replay(
498            &self,
499            draft: ProtocolReplayDraft,
500        ) -> Result<ProtocolReplayAppendResult, ProtocolReplayError> {
501            let mut rows = self.rows.lock().unwrap();
502            let n = rows.len() + 1;
503            let record = ProtocolReplayRecord::from_append(
504                ProtocolReplayId::new(format!("pr_{n}"))?,
505                ProtocolReplayCursor::new(format!("prcur_{n}"))?,
506                n as u64,
507                draft,
508            )?;
509            rows.push(record.clone());
510            Ok(ProtocolReplayAppendResult { record })
511        }
512    }
513
514    #[async_trait]
515    impl ProtocolReplayReader for InMemReplay {
516        async fn list_replay(
517            &self,
518            _stream: ProtocolStreamKey,
519            _from: Option<ProtocolReplayCursor>,
520            _limit: usize,
521        ) -> Result<ProtocolReplayPage, ProtocolReplayError> {
522            Ok(ProtocolReplayPage {
523                records: Vec::new(),
524                next_cursor: None,
525                has_more: false,
526            })
527        }
528    }
529
530    #[async_trait]
531    impl ProtocolReplayLookup for InMemReplay {
532        async fn load_replay(
533            &self,
534            protocol_replay_id: &ProtocolReplayId,
535        ) -> Result<Option<ProtocolReplayRecord>, ProtocolReplayError> {
536            Ok(self
537                .rows
538                .lock()
539                .unwrap()
540                .iter()
541                .find(|r| r.protocol_replay_id.as_str() == protocol_replay_id.as_str())
542                .cloned())
543        }
544    }
545
546    #[tokio::test]
547    async fn scoped_load_replay_rejects_cross_scope_id() {
548        use std::sync::Arc;
549        let inner = Arc::new(InMemReplay::default());
550        let scope_a = ScopedProtocolReplayLog::new(inner.clone(), ScopeId::new("scope-a").unwrap());
551        let appended = scope_a.append_replay(replay_draft()).await.unwrap();
552        let id = appended.record.protocol_replay_id.clone();
553
554        // Same scope resolves the record.
555        assert!(scope_a.load_replay(&id).await.unwrap().is_some());
556
557        // A different scope must NOT resolve it by id: the record's stream_id is
558        // scoped to `scope-a`, so `decode_record` (unscope) fails and the scoped
559        // lookup yields `None` — no cross-scope leak.
560        let scope_b = ScopedProtocolReplayLog::new(inner, ScopeId::new("scope-b").unwrap());
561        assert!(scope_b.load_replay(&id).await.unwrap().is_none());
562    }
563}