1use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use thiserror::Error;
7
8use super::event_store::{CanonicalEventId, EventCursor, EventScope};
9use crate::contract::scope::{ScopeId, scoped_key, unscoped_key};
10use std::sync::Arc;
11
12#[derive(Debug, Error, Clone, PartialEq, Eq)]
14pub enum ProtocolReplayError {
15 #[error("validation error: {0}")]
17 Validation(String),
18 #[error("conflict: {0}")]
20 Conflict(String),
21 #[error("cursor expired: {0}")]
23 CursorExpired(String),
24 #[error("integrity error: {0}")]
26 Integrity(String),
27 #[error("io error: {0}")]
29 Io(String),
30 #[error("serialization error: {0}")]
32 Serialization(String),
33}
34
35#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
37#[serde(transparent)]
38pub struct ProtocolReplayId(String);
39
40impl ProtocolReplayId {
41 pub fn new(value: impl Into<String>) -> Result<Self, ProtocolReplayError> {
42 let value = value.into();
43 reject_blank("protocol_replay_id", &value)?;
44 Ok(Self(value))
45 }
46
47 #[must_use]
48 pub fn as_str(&self) -> &str {
49 &self.0
50 }
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
55#[serde(transparent)]
56pub struct ProtocolReplayCursor(String);
57
58impl ProtocolReplayCursor {
59 pub fn new(value: impl Into<String>) -> Result<Self, ProtocolReplayError> {
60 let value = value.into();
61 reject_blank("protocol_replay_cursor", &value)?;
62 Ok(Self(value))
63 }
64
65 #[must_use]
66 pub fn as_str(&self) -> &str {
67 &self.0
68 }
69}
70
71#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
73#[serde(rename_all = "snake_case")]
74pub enum ProtocolReplayRedactionState {
75 #[default]
77 Clear,
78 Redacted,
80}
81
82#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
84pub struct SourceEventCursor {
85 pub event_id: CanonicalEventId,
86 pub scope: EventScope,
87 pub cursor: EventCursor,
88}
89
90impl SourceEventCursor {
91 #[must_use]
92 pub fn new(event_id: CanonicalEventId, scope: EventScope, cursor: EventCursor) -> Self {
93 Self {
94 event_id,
95 scope,
96 cursor,
97 }
98 }
99}
100
101#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
103pub struct ProtocolReplayDraft {
104 pub stream_id: String,
105 pub protocol: String,
106 pub protocol_version: String,
107 pub projector_version: String,
108 pub wire_event_id: String,
109 pub wire_event_type: String,
110 pub wire_payload_bytes: Vec<u8>,
111 #[serde(default, skip_serializing_if = "Option::is_none")]
112 pub wire_payload_json: Option<Value>,
113 #[serde(default, skip_serializing_if = "Vec::is_empty")]
114 pub source_event_ids: Vec<CanonicalEventId>,
115 #[serde(default, skip_serializing_if = "Vec::is_empty")]
116 pub source_event_cursors: Vec<SourceEventCursor>,
117 #[serde(default)]
118 pub redaction_state: ProtocolReplayRedactionState,
119 #[serde(default, skip_serializing_if = "Option::is_none")]
120 pub expires_at: Option<u64>,
121}
122
123impl ProtocolReplayDraft {
124 pub fn new(
126 stream_id: impl Into<String>,
127 protocol: impl Into<String>,
128 protocol_version: impl Into<String>,
129 projector_version: impl Into<String>,
130 wire_event_id: impl Into<String>,
131 wire_event_type: impl Into<String>,
132 wire_payload_bytes: Vec<u8>,
133 ) -> Result<Self, ProtocolReplayError> {
134 let draft = Self {
135 stream_id: stream_id.into(),
136 protocol: protocol.into(),
137 protocol_version: protocol_version.into(),
138 projector_version: projector_version.into(),
139 wire_event_id: wire_event_id.into(),
140 wire_event_type: wire_event_type.into(),
141 wire_payload_bytes,
142 wire_payload_json: None,
143 source_event_ids: Vec::new(),
144 source_event_cursors: Vec::new(),
145 redaction_state: ProtocolReplayRedactionState::default(),
146 expires_at: None,
147 };
148 draft.validate()?;
149 Ok(draft)
150 }
151
152 pub fn validate(&self) -> Result<(), ProtocolReplayError> {
154 reject_blank("stream_id", &self.stream_id)?;
155 reject_blank("protocol", &self.protocol)?;
156 reject_blank("protocol_version", &self.protocol_version)?;
157 reject_blank("projector_version", &self.projector_version)?;
158 reject_blank("wire_event_id", &self.wire_event_id)?;
159 reject_blank("wire_event_type", &self.wire_event_type)?;
160 if self.wire_payload_bytes.is_empty() {
161 return Err(ProtocolReplayError::Validation(
162 "wire_payload_bytes is required".to_string(),
163 ));
164 }
165 Ok(())
166 }
167}
168
169#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
171pub struct ProtocolReplayRecord {
172 pub protocol_replay_id: ProtocolReplayId,
173 pub stream_id: String,
174 pub protocol: String,
175 pub protocol_version: String,
176 pub projector_version: String,
177 pub wire_event_id: String,
178 pub wire_event_type: String,
179 pub wire_payload_bytes: Vec<u8>,
180 #[serde(default, skip_serializing_if = "Option::is_none")]
181 pub wire_payload_json: Option<Value>,
182 #[serde(default, skip_serializing_if = "Vec::is_empty")]
183 pub source_event_ids: Vec<CanonicalEventId>,
184 #[serde(default, skip_serializing_if = "Vec::is_empty")]
185 pub source_event_cursors: Vec<SourceEventCursor>,
186 pub protocol_replay_cursor: ProtocolReplayCursor,
187 #[serde(default)]
188 pub redaction_state: ProtocolReplayRedactionState,
189 #[serde(default, skip_serializing_if = "Option::is_none")]
190 pub expires_at: Option<u64>,
191 pub created_at: u64,
192}
193
194impl ProtocolReplayRecord {
195 pub fn from_append(
197 protocol_replay_id: ProtocolReplayId,
198 protocol_replay_cursor: ProtocolReplayCursor,
199 created_at: u64,
200 draft: ProtocolReplayDraft,
201 ) -> Result<Self, ProtocolReplayError> {
202 draft.validate()?;
203 Ok(Self {
204 protocol_replay_id,
205 stream_id: draft.stream_id,
206 protocol: draft.protocol,
207 protocol_version: draft.protocol_version,
208 projector_version: draft.projector_version,
209 wire_event_id: draft.wire_event_id,
210 wire_event_type: draft.wire_event_type,
211 wire_payload_bytes: draft.wire_payload_bytes,
212 wire_payload_json: draft.wire_payload_json,
213 source_event_ids: draft.source_event_ids,
214 source_event_cursors: draft.source_event_cursors,
215 protocol_replay_cursor,
216 redaction_state: draft.redaction_state,
217 expires_at: draft.expires_at,
218 created_at,
219 })
220 }
221}
222
223#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
225pub struct ProtocolStreamKey {
226 pub stream_id: String,
227 pub protocol: String,
228 pub protocol_version: String,
229}
230
231impl ProtocolStreamKey {
232 pub fn new(
233 stream_id: impl Into<String>,
234 protocol: impl Into<String>,
235 protocol_version: impl Into<String>,
236 ) -> Result<Self, ProtocolReplayError> {
237 let key = Self {
238 stream_id: stream_id.into(),
239 protocol: protocol.into(),
240 protocol_version: protocol_version.into(),
241 };
242 key.validate()?;
243 Ok(key)
244 }
245
246 pub fn validate(&self) -> Result<(), ProtocolReplayError> {
247 reject_blank("stream_id", &self.stream_id)?;
248 reject_blank("protocol", &self.protocol)?;
249 reject_blank("protocol_version", &self.protocol_version)
250 }
251}
252
253#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
255pub struct ProtocolReplayAppendResult {
256 pub record: ProtocolReplayRecord,
257}
258
259impl ProtocolReplayAppendResult {
260 #[must_use]
261 pub fn protocol_replay_cursor(&self) -> &ProtocolReplayCursor {
262 &self.record.protocol_replay_cursor
263 }
264}
265
266#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
268pub struct ProtocolReplayPage {
269 pub records: Vec<ProtocolReplayRecord>,
270 #[serde(default, skip_serializing_if = "Option::is_none")]
271 pub next_cursor: Option<ProtocolReplayCursor>,
272 pub has_more: bool,
273}
274
275#[async_trait]
277pub trait ProtocolReplayWriter: Send + Sync {
278 async fn append_replay(
279 &self,
280 draft: ProtocolReplayDraft,
281 ) -> Result<ProtocolReplayAppendResult, ProtocolReplayError>;
282}
283
284#[async_trait]
286pub trait ProtocolReplayReader: Send + Sync {
287 async fn list_replay(
288 &self,
289 stream: ProtocolStreamKey,
290 from: Option<ProtocolReplayCursor>,
291 limit: usize,
292 ) -> Result<ProtocolReplayPage, ProtocolReplayError>;
293}
294
295#[async_trait]
297pub trait ProtocolReplayLookup: Send + Sync {
298 async fn load_replay(
299 &self,
300 protocol_replay_id: &ProtocolReplayId,
301 ) -> Result<Option<ProtocolReplayRecord>, ProtocolReplayError>;
302}
303
304pub trait ProtocolReplayLog:
306 ProtocolReplayWriter + ProtocolReplayReader + ProtocolReplayLookup
307{
308}
309
310impl<T> ProtocolReplayLog for T where
311 T: ProtocolReplayWriter + ProtocolReplayReader + ProtocolReplayLookup
312{
313}
314
315fn reject_blank(field: &str, value: &str) -> Result<(), ProtocolReplayError> {
316 if value.trim().is_empty() {
317 return Err(ProtocolReplayError::Validation(format!(
318 "{field} is required"
319 )));
320 }
321 Ok(())
322}
323
324#[derive(Clone)]
325pub struct ScopedProtocolReplayLog {
326 inner: Arc<dyn ProtocolReplayLog>,
327 scope_id: ScopeId,
328}
329
330impl ScopedProtocolReplayLog {
331 pub fn new(inner: Arc<dyn ProtocolReplayLog>, scope_id: ScopeId) -> Self {
332 Self { inner, scope_id }
333 }
334
335 pub fn scope_id(&self) -> &ScopeId {
336 &self.scope_id
337 }
338
339 pub fn inner(&self) -> &dyn ProtocolReplayLog {
340 self.inner.as_ref()
341 }
342
343 fn scoped(&self, value: &str) -> String {
344 scoped_key(&self.scope_id, value)
345 }
346
347 fn unscoped<'a>(&self, value: &'a str) -> Option<&'a str> {
348 unscoped_key(&self.scope_id, value)
349 }
350
351 fn encode_draft(&self, mut draft: ProtocolReplayDraft) -> ProtocolReplayDraft {
352 draft.stream_id = self.scoped(&draft.stream_id);
353 draft
354 }
355
356 fn encode_stream(&self, mut stream: ProtocolStreamKey) -> ProtocolStreamKey {
357 stream.stream_id = self.scoped(&stream.stream_id);
358 stream
359 }
360
361 fn decode_record(&self, mut record: ProtocolReplayRecord) -> Option<ProtocolReplayRecord> {
362 record.stream_id = self.unscoped(&record.stream_id)?.to_string();
363 Some(record)
364 }
365}
366
367#[async_trait]
368impl ProtocolReplayWriter for ScopedProtocolReplayLog {
369 async fn append_replay(
370 &self,
371 draft: ProtocolReplayDraft,
372 ) -> Result<ProtocolReplayAppendResult, ProtocolReplayError> {
373 let result = self.inner.append_replay(self.encode_draft(draft)).await?;
374 let record = self.decode_record(result.record).ok_or_else(|| {
375 ProtocolReplayError::Integrity(
376 "scoped replay log returned a record outside its scope".into(),
377 )
378 })?;
379 Ok(ProtocolReplayAppendResult { record })
380 }
381}
382
383#[async_trait]
384impl ProtocolReplayReader for ScopedProtocolReplayLog {
385 async fn list_replay(
386 &self,
387 stream: ProtocolStreamKey,
388 from: Option<ProtocolReplayCursor>,
389 limit: usize,
390 ) -> Result<ProtocolReplayPage, ProtocolReplayError> {
391 let mut page = self
392 .inner
393 .list_replay(self.encode_stream(stream), from, limit)
394 .await?;
395 page.records = page
396 .records
397 .into_iter()
398 .filter_map(|record| self.decode_record(record))
399 .collect();
400 Ok(page)
401 }
402}
403
404#[async_trait]
405impl ProtocolReplayLookup for ScopedProtocolReplayLog {
406 async fn load_replay(
407 &self,
408 protocol_replay_id: &ProtocolReplayId,
409 ) -> Result<Option<ProtocolReplayRecord>, ProtocolReplayError> {
410 Ok(self
411 .inner
412 .load_replay(protocol_replay_id)
413 .await?
414 .and_then(|record| self.decode_record(record)))
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421
422 fn replay_draft() -> ProtocolReplayDraft {
423 ProtocolReplayDraft::new(
424 "thread:t1",
425 "ai-sdk",
426 "v6",
427 "projector-1",
428 "evt_wire_1",
429 "agent.message",
430 br#"{"type":"agent.message"}"#.to_vec(),
431 )
432 .unwrap()
433 }
434
435 #[test]
436 fn draft_requires_wire_payload_bytes() {
437 let err = ProtocolReplayDraft::new(
438 "thread:t1",
439 "ai-sdk",
440 "v6",
441 "projector-1",
442 "evt_wire_1",
443 "agent.message",
444 Vec::new(),
445 )
446 .unwrap_err();
447 assert!(
448 matches!(err, ProtocolReplayError::Validation(message) if message.contains("wire_payload"))
449 );
450 }
451
452 #[test]
453 fn stream_key_rejects_blank_protocol() {
454 let err = ProtocolStreamKey::new("thread:t1", " ", "v6").unwrap_err();
455 assert!(
456 matches!(err, ProtocolReplayError::Validation(message) if message.contains("protocol"))
457 );
458 }
459
460 #[test]
461 fn persisted_record_preserves_byte_payload_and_cursor() {
462 let mut draft = replay_draft();
463 draft.wire_payload_json = Some(serde_json::json!({"type":"agent.message"}));
464 let record = ProtocolReplayRecord::from_append(
465 ProtocolReplayId::new("pr_1").unwrap(),
466 ProtocolReplayCursor::new("prcur_1").unwrap(),
467 42,
468 draft,
469 )
470 .unwrap();
471
472 assert_eq!(record.protocol_replay_id.as_str(), "pr_1");
473 assert_eq!(record.protocol_replay_cursor.as_str(), "prcur_1");
474 assert_eq!(record.wire_payload_bytes, br#"{"type":"agent.message"}"#);
475 assert_eq!(record.wire_payload_json.unwrap()["type"], "agent.message");
476 }
477
478 #[test]
479 fn opaque_replay_cursor_roundtrips() {
480 let cursor = ProtocolReplayCursor::new("wirecur_opaque").unwrap();
481 let encoded = serde_json::to_string(&cursor).unwrap();
482 assert_eq!(encoded, "\"wirecur_opaque\"");
483 let decoded: ProtocolReplayCursor = serde_json::from_str(&encoded).unwrap();
484 assert_eq!(decoded.as_str(), "wirecur_opaque");
485 }
486
487 #[derive(Default)]
491 struct InMemReplay {
492 rows: std::sync::Mutex<Vec<ProtocolReplayRecord>>,
493 }
494
495 #[async_trait]
496 impl ProtocolReplayWriter for InMemReplay {
497 async fn append_replay(
498 &self,
499 draft: ProtocolReplayDraft,
500 ) -> Result<ProtocolReplayAppendResult, ProtocolReplayError> {
501 let mut rows = self.rows.lock().unwrap();
502 let n = rows.len() + 1;
503 let record = ProtocolReplayRecord::from_append(
504 ProtocolReplayId::new(format!("pr_{n}"))?,
505 ProtocolReplayCursor::new(format!("prcur_{n}"))?,
506 n as u64,
507 draft,
508 )?;
509 rows.push(record.clone());
510 Ok(ProtocolReplayAppendResult { record })
511 }
512 }
513
514 #[async_trait]
515 impl ProtocolReplayReader for InMemReplay {
516 async fn list_replay(
517 &self,
518 _stream: ProtocolStreamKey,
519 _from: Option<ProtocolReplayCursor>,
520 _limit: usize,
521 ) -> Result<ProtocolReplayPage, ProtocolReplayError> {
522 Ok(ProtocolReplayPage {
523 records: Vec::new(),
524 next_cursor: None,
525 has_more: false,
526 })
527 }
528 }
529
530 #[async_trait]
531 impl ProtocolReplayLookup for InMemReplay {
532 async fn load_replay(
533 &self,
534 protocol_replay_id: &ProtocolReplayId,
535 ) -> Result<Option<ProtocolReplayRecord>, ProtocolReplayError> {
536 Ok(self
537 .rows
538 .lock()
539 .unwrap()
540 .iter()
541 .find(|r| r.protocol_replay_id.as_str() == protocol_replay_id.as_str())
542 .cloned())
543 }
544 }
545
546 #[tokio::test]
547 async fn scoped_load_replay_rejects_cross_scope_id() {
548 use std::sync::Arc;
549 let inner = Arc::new(InMemReplay::default());
550 let scope_a = ScopedProtocolReplayLog::new(inner.clone(), ScopeId::new("scope-a").unwrap());
551 let appended = scope_a.append_replay(replay_draft()).await.unwrap();
552 let id = appended.record.protocol_replay_id.clone();
553
554 assert!(scope_a.load_replay(&id).await.unwrap().is_some());
556
557 let scope_b = ScopedProtocolReplayLog::new(inner, ScopeId::new("scope-b").unwrap());
561 assert!(scope_b.load_replay(&id).await.unwrap().is_none());
562 }
563}