Skip to main content

atproto_tap/
events.rs

1//! TAP event types for AT Protocol record and identity events.
2//!
3//! This module defines the event structures received from a TAP service.
4//! Events are optimized for memory efficiency using:
5//! - `CompactString` for small strings (SSO for ≤24 bytes)
6//! - `Box<str>` for immutable strings (no capacity overhead)
7//! - `serde_json::Value` for record payloads (allows lazy access)
8
9use compact_str::CompactString;
10use serde::de::{self, Deserializer, IgnoredAny, MapAccess, Visitor};
11use serde::{Deserialize, Serialize, de::DeserializeOwned};
12use std::fmt;
13
14/// A TAP event received from the stream.
15///
16/// TAP delivers two types of events:
17/// - `Record`: Repository record changes (create, update, delete)
18/// - `Identity`: Identity/handle changes for accounts
19#[derive(Debug, Clone, Serialize, Deserialize)]
20#[serde(tag = "type", rename_all = "lowercase")]
21pub enum TapEvent {
22    /// A repository record event (create, update, or delete).
23    Record {
24        /// Sequential event identifier.
25        id: u64,
26        /// The record event data.
27        record: RecordEvent,
28    },
29    /// An identity change event.
30    Identity {
31        /// Sequential event identifier.
32        id: u64,
33        /// The identity event data.
34        identity: IdentityEvent,
35    },
36}
37
38impl TapEvent {
39    /// Returns the event ID.
40    pub fn id(&self) -> u64 {
41        match self {
42            TapEvent::Record { id, .. } => *id,
43            TapEvent::Identity { id, .. } => *id,
44        }
45    }
46}
47
48/// Extract only the event ID from a JSON string without fully parsing it.
49///
50/// This is a fallback parser used when full `TapEvent` parsing fails (e.g., due to
51/// deeply nested records hitting serde_json's recursion limit). It uses `IgnoredAny`
52/// to efficiently skip over nested content without building data structures, allowing
53/// us to extract the ID for acknowledgment even when full parsing fails.
54///
55/// # Example
56///
57/// ```
58/// use atproto_tap::extract_event_id;
59///
60/// let json = r#"{"type":"record","id":12345,"record":{"deeply":"nested"}}"#;
61/// assert_eq!(extract_event_id(json), Some(12345));
62/// ```
63pub fn extract_event_id(json: &str) -> Option<u64> {
64    let mut deserializer = serde_json::Deserializer::from_str(json);
65    deserializer.disable_recursion_limit();
66    EventIdOnly::deserialize(&mut deserializer)
67        .ok()
68        .map(|e| e.id)
69}
70
71/// Internal struct for extracting only the "id" field from a TAP event.
72#[derive(Debug)]
73struct EventIdOnly {
74    id: u64,
75}
76
77impl<'de> Deserialize<'de> for EventIdOnly {
78    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
79    where
80        D: Deserializer<'de>,
81    {
82        deserializer.deserialize_map(EventIdOnlyVisitor)
83    }
84}
85
86struct EventIdOnlyVisitor;
87
88impl<'de> Visitor<'de> for EventIdOnlyVisitor {
89    type Value = EventIdOnly;
90
91    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
92        formatter.write_str("a map with an 'id' field")
93    }
94
95    fn visit_map<M>(self, mut map: M) -> Result<Self::Value, M::Error>
96    where
97        M: MapAccess<'de>,
98    {
99        let mut id: Option<u64> = None;
100
101        while let Some(key) = map.next_key::<&str>()? {
102            if key == "id" {
103                id = Some(map.next_value()?);
104                // Found what we need - skip the rest efficiently using IgnoredAny
105                // which handles deeply nested structures without recursion issues
106                while map.next_entry::<IgnoredAny, IgnoredAny>()?.is_some() {}
107                break;
108            } else {
109                // Skip this value without fully parsing it
110                map.next_value::<IgnoredAny>()?;
111            }
112        }
113
114        id.map(|id| EventIdOnly { id })
115            .ok_or_else(|| de::Error::missing_field("id"))
116    }
117}
118
119/// A repository record event from TAP.
120///
121/// Contains information about a record change in a user's repository,
122/// including the action taken and the record data (for creates/updates).
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct RecordEvent {
125    /// True if from live firehose, false if from backfill/resync.
126    ///
127    /// During initial sync or recovery, TAP delivers historical events
128    /// with `live: false`. Once caught up, live events have `live: true`.
129    pub live: bool,
130
131    /// Repository revision identifier.
132    ///
133    /// Typically 13 characters, stored inline via CompactString SSO.
134    pub rev: CompactString,
135
136    /// Actor DID (e.g., "did:plc:xyz123").
137    pub did: Box<str>,
138
139    /// Collection NSID (e.g., "app.bsky.feed.post").
140    pub collection: Box<str>,
141
142    /// Record key within the collection.
143    ///
144    /// Typically a TID (13 characters), stored inline via CompactString SSO.
145    pub rkey: CompactString,
146
147    /// The action performed on the record.
148    pub action: RecordAction,
149
150    /// Content identifier (CID) of the record.
151    ///
152    /// Present for create and update actions, absent for delete.
153    #[serde(skip_serializing_if = "Option::is_none")]
154    pub cid: Option<CompactString>,
155
156    /// Record data as JSON value.
157    ///
158    /// Present for create and update actions, absent for delete.
159    /// Use [`parse_record`](Self::parse_record) to deserialize on demand.
160    #[serde(skip_serializing_if = "Option::is_none")]
161    pub record: Option<serde_json::Value>,
162}
163
164impl RecordEvent {
165    /// Parse the record payload into a typed structure.
166    ///
167    /// This method deserializes the raw JSON on demand, avoiding
168    /// unnecessary allocations when the record data isn't needed.
169    ///
170    /// # Errors
171    ///
172    /// Returns an error if the record is absent (delete events) or
173    /// if deserialization fails.
174    ///
175    /// # Example
176    ///
177    /// ```ignore
178    /// use serde::Deserialize;
179    ///
180    /// #[derive(Deserialize)]
181    /// struct Post {
182    ///     text: String,
183    ///     #[serde(rename = "createdAt")]
184    ///     created_at: String,
185    /// }
186    ///
187    /// let post: Post = record_event.parse_record()?;
188    /// println!("Post text: {}", post.text);
189    /// ```
190    pub fn parse_record<T: DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
191        match &self.record {
192            Some(value) => serde_json::from_value(value.clone()),
193            None => Err(serde::de::Error::custom("no record data (delete event)")),
194        }
195    }
196
197    /// Returns the record as a JSON Value reference, if present.
198    pub fn record_value(&self) -> Option<&serde_json::Value> {
199        self.record.as_ref()
200    }
201
202    /// Returns true if this is a delete event.
203    pub fn is_delete(&self) -> bool {
204        self.action == RecordAction::Delete
205    }
206
207    /// Returns the AT-URI for this record.
208    ///
209    /// Format: `at://{did}/{collection}/{rkey}`
210    pub fn at_uri(&self) -> String {
211        format!("at://{}/{}/{}", self.did, self.collection, self.rkey)
212    }
213}
214
215/// The action performed on a record.
216#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
217#[serde(rename_all = "lowercase")]
218pub enum RecordAction {
219    /// A new record was created.
220    Create,
221    /// An existing record was updated.
222    Update,
223    /// A record was deleted.
224    Delete,
225}
226
227impl std::fmt::Display for RecordAction {
228    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229        match self {
230            RecordAction::Create => write!(f, "create"),
231            RecordAction::Update => write!(f, "update"),
232            RecordAction::Delete => write!(f, "delete"),
233        }
234    }
235}
236
237/// An identity change event from TAP.
238///
239/// Contains information about handle or account status changes.
240#[derive(Debug, Clone, Serialize, Deserialize)]
241pub struct IdentityEvent {
242    /// Actor DID.
243    pub did: Box<str>,
244
245    /// Current handle for the account.
246    pub handle: Box<str>,
247
248    /// Whether the account is currently active.
249    #[serde(default)]
250    pub is_active: bool,
251
252    /// Account status.
253    #[serde(default)]
254    pub status: IdentityStatus,
255}
256
257/// Account status in an identity event.
258#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
259#[serde(rename_all = "lowercase")]
260pub enum IdentityStatus {
261    /// Account is active and in good standing.
262    #[default]
263    Active,
264    /// Account has been deactivated by the user.
265    Deactivated,
266    /// Account has been suspended.
267    Suspended,
268    /// Account has been deleted.
269    Deleted,
270    /// Account has been taken down.
271    Takendown,
272}
273
274impl std::fmt::Display for IdentityStatus {
275    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276        match self {
277            IdentityStatus::Active => write!(f, "active"),
278            IdentityStatus::Deactivated => write!(f, "deactivated"),
279            IdentityStatus::Suspended => write!(f, "suspended"),
280            IdentityStatus::Deleted => write!(f, "deleted"),
281            IdentityStatus::Takendown => write!(f, "takendown"),
282        }
283    }
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289
290    #[test]
291    fn test_parse_record_event() {
292        let json = r#"{
293            "id": 12345,
294            "type": "record",
295            "record": {
296                "live": true,
297                "rev": "3lyileto4q52k",
298                "did": "did:plc:z72i7hdynmk6r22z27h6tvur",
299                "collection": "app.bsky.feed.post",
300                "rkey": "3lyiletddxt2c",
301                "action": "create",
302                "cid": "bafyreigroo6vhxt62ufcndhaxzas6btq4jmniuz4egszbwuqgiyisqwqoy",
303                "record": {"$type": "app.bsky.feed.post", "text": "Hello world!", "createdAt": "2025-01-01T00:00:00Z"}
304            }
305        }"#;
306
307        let event: TapEvent = serde_json::from_str(json).expect("Failed to parse");
308
309        match event {
310            TapEvent::Record { id, record } => {
311                assert_eq!(id, 12345);
312                assert!(record.live);
313                assert_eq!(record.rev.as_str(), "3lyileto4q52k");
314                assert_eq!(&*record.did, "did:plc:z72i7hdynmk6r22z27h6tvur");
315                assert_eq!(&*record.collection, "app.bsky.feed.post");
316                assert_eq!(record.rkey.as_str(), "3lyiletddxt2c");
317                assert_eq!(record.action, RecordAction::Create);
318                assert!(record.cid.is_some());
319                assert!(record.record.is_some());
320
321                // Test lazy parsing
322                #[derive(Deserialize)]
323                struct Post {
324                    text: String,
325                }
326                let post: Post = record.parse_record().expect("Failed to parse record");
327                assert_eq!(post.text, "Hello world!");
328            }
329            _ => panic!("Expected Record event"),
330        }
331    }
332
333    #[test]
334    fn test_parse_delete_event() {
335        let json = r#"{
336            "id": 12346,
337            "type": "record",
338            "record": {
339                "live": true,
340                "rev": "3lyileto4q52k",
341                "did": "did:plc:z72i7hdynmk6r22z27h6tvur",
342                "collection": "app.bsky.feed.post",
343                "rkey": "3lyiletddxt2c",
344                "action": "delete"
345            }
346        }"#;
347
348        let event: TapEvent = serde_json::from_str(json).expect("Failed to parse");
349
350        match event {
351            TapEvent::Record { id, record } => {
352                assert_eq!(id, 12346);
353                assert_eq!(record.action, RecordAction::Delete);
354                assert!(record.is_delete());
355                assert!(record.cid.is_none());
356                assert!(record.record.is_none());
357            }
358            _ => panic!("Expected Record event"),
359        }
360    }
361
362    #[test]
363    fn test_parse_identity_event() {
364        let json = r#"{
365            "id": 12347,
366            "type": "identity",
367            "identity": {
368                "did": "did:plc:z72i7hdynmk6r22z27h6tvur",
369                "handle": "user.bsky.social",
370                "is_active": true,
371                "status": "active"
372            }
373        }"#;
374
375        let event: TapEvent = serde_json::from_str(json).expect("Failed to parse");
376
377        match event {
378            TapEvent::Identity { id, identity } => {
379                assert_eq!(id, 12347);
380                assert_eq!(&*identity.did, "did:plc:z72i7hdynmk6r22z27h6tvur");
381                assert_eq!(&*identity.handle, "user.bsky.social");
382                assert!(identity.is_active);
383                assert_eq!(identity.status, IdentityStatus::Active);
384            }
385            _ => panic!("Expected Identity event"),
386        }
387    }
388
389    #[test]
390    fn test_record_action_display() {
391        assert_eq!(RecordAction::Create.to_string(), "create");
392        assert_eq!(RecordAction::Update.to_string(), "update");
393        assert_eq!(RecordAction::Delete.to_string(), "delete");
394    }
395
396    #[test]
397    fn test_identity_status_display() {
398        assert_eq!(IdentityStatus::Active.to_string(), "active");
399        assert_eq!(IdentityStatus::Deactivated.to_string(), "deactivated");
400        assert_eq!(IdentityStatus::Suspended.to_string(), "suspended");
401        assert_eq!(IdentityStatus::Deleted.to_string(), "deleted");
402        assert_eq!(IdentityStatus::Takendown.to_string(), "takendown");
403    }
404
405    #[test]
406    fn test_at_uri() {
407        let record = RecordEvent {
408            live: true,
409            rev: "3lyileto4q52k".into(),
410            did: "did:plc:xyz".into(),
411            collection: "app.bsky.feed.post".into(),
412            rkey: "abc123".into(),
413            action: RecordAction::Create,
414            cid: None,
415            record: None,
416        };
417
418        assert_eq!(
419            record.at_uri(),
420            "at://did:plc:xyz/app.bsky.feed.post/abc123"
421        );
422    }
423
424    #[test]
425    fn test_event_id() {
426        let record_event = TapEvent::Record {
427            id: 100,
428            record: RecordEvent {
429                live: true,
430                rev: "rev".into(),
431                did: "did".into(),
432                collection: "col".into(),
433                rkey: "rkey".into(),
434                action: RecordAction::Create,
435                cid: None,
436                record: None,
437            },
438        };
439        assert_eq!(record_event.id(), 100);
440
441        let identity_event = TapEvent::Identity {
442            id: 200,
443            identity: IdentityEvent {
444                did: "did".into(),
445                handle: "handle".into(),
446                is_active: true,
447                status: IdentityStatus::Active,
448            },
449        };
450        assert_eq!(identity_event.id(), 200);
451    }
452
453    #[test]
454    fn test_extract_event_id_simple() {
455        let json = r#"{"type":"record","id":12345,"record":{"deeply":"nested"}}"#;
456        assert_eq!(extract_event_id(json), Some(12345));
457    }
458
459    #[test]
460    fn test_extract_event_id_at_end() {
461        let json = r#"{"type":"record","record":{"deeply":"nested"},"id":99999}"#;
462        assert_eq!(extract_event_id(json), Some(99999));
463    }
464
465    #[test]
466    fn test_extract_event_id_missing() {
467        let json = r#"{"type":"record","record":{"deeply":"nested"}}"#;
468        assert_eq!(extract_event_id(json), None);
469    }
470
471    #[test]
472    fn test_extract_event_id_invalid_json() {
473        let json = r#"{"type":"record","id":123"#; // Truncated JSON
474        assert_eq!(extract_event_id(json), None);
475    }
476
477    #[test]
478    fn test_extract_event_id_deeply_nested() {
479        // Create a deeply nested JSON that would exceed serde_json's default recursion limit
480        let mut json = String::from(r#"{"id":42,"record":{"nested":"#);
481        for _ in 0..200 {
482            json.push_str("[");
483        }
484        json.push_str("1");
485        for _ in 0..200 {
486            json.push_str("]");
487        }
488        json.push_str("}}");
489
490        // extract_event_id should still work because it uses IgnoredAny with disabled recursion limit
491        assert_eq!(extract_event_id(&json), Some(42));
492    }
493}