1use compact_str::CompactString;
10use serde::de::{self, Deserializer, IgnoredAny, MapAccess, Visitor};
11use serde::{Deserialize, Serialize, de::DeserializeOwned};
12use std::fmt;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
20#[serde(tag = "type", rename_all = "lowercase")]
21pub enum TapEvent {
22 Record {
24 id: u64,
26 record: RecordEvent,
28 },
29 Identity {
31 id: u64,
33 identity: IdentityEvent,
35 },
36}
37
38impl TapEvent {
39 pub fn id(&self) -> u64 {
41 match self {
42 TapEvent::Record { id, .. } => *id,
43 TapEvent::Identity { id, .. } => *id,
44 }
45 }
46}
47
48pub fn extract_event_id(json: &str) -> Option<u64> {
64 let mut deserializer = serde_json::Deserializer::from_str(json);
65 deserializer.disable_recursion_limit();
66 EventIdOnly::deserialize(&mut deserializer)
67 .ok()
68 .map(|e| e.id)
69}
70
71#[derive(Debug)]
73struct EventIdOnly {
74 id: u64,
75}
76
77impl<'de> Deserialize<'de> for EventIdOnly {
78 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
79 where
80 D: Deserializer<'de>,
81 {
82 deserializer.deserialize_map(EventIdOnlyVisitor)
83 }
84}
85
86struct EventIdOnlyVisitor;
87
88impl<'de> Visitor<'de> for EventIdOnlyVisitor {
89 type Value = EventIdOnly;
90
91 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
92 formatter.write_str("a map with an 'id' field")
93 }
94
95 fn visit_map<M>(self, mut map: M) -> Result<Self::Value, M::Error>
96 where
97 M: MapAccess<'de>,
98 {
99 let mut id: Option<u64> = None;
100
101 while let Some(key) = map.next_key::<&str>()? {
102 if key == "id" {
103 id = Some(map.next_value()?);
104 while map.next_entry::<IgnoredAny, IgnoredAny>()?.is_some() {}
107 break;
108 } else {
109 map.next_value::<IgnoredAny>()?;
111 }
112 }
113
114 id.map(|id| EventIdOnly { id })
115 .ok_or_else(|| de::Error::missing_field("id"))
116 }
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct RecordEvent {
125 pub live: bool,
130
131 pub rev: CompactString,
135
136 pub did: Box<str>,
138
139 pub collection: Box<str>,
141
142 pub rkey: CompactString,
146
147 pub action: RecordAction,
149
150 #[serde(skip_serializing_if = "Option::is_none")]
154 pub cid: Option<CompactString>,
155
156 #[serde(skip_serializing_if = "Option::is_none")]
161 pub record: Option<serde_json::Value>,
162}
163
164impl RecordEvent {
165 pub fn parse_record<T: DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
191 match &self.record {
192 Some(value) => serde_json::from_value(value.clone()),
193 None => Err(serde::de::Error::custom("no record data (delete event)")),
194 }
195 }
196
197 pub fn record_value(&self) -> Option<&serde_json::Value> {
199 self.record.as_ref()
200 }
201
202 pub fn is_delete(&self) -> bool {
204 self.action == RecordAction::Delete
205 }
206
207 pub fn at_uri(&self) -> String {
211 format!("at://{}/{}/{}", self.did, self.collection, self.rkey)
212 }
213}
214
215#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
217#[serde(rename_all = "lowercase")]
218pub enum RecordAction {
219 Create,
221 Update,
223 Delete,
225}
226
227impl std::fmt::Display for RecordAction {
228 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229 match self {
230 RecordAction::Create => write!(f, "create"),
231 RecordAction::Update => write!(f, "update"),
232 RecordAction::Delete => write!(f, "delete"),
233 }
234 }
235}
236
237#[derive(Debug, Clone, Serialize, Deserialize)]
241pub struct IdentityEvent {
242 pub did: Box<str>,
244
245 pub handle: Box<str>,
247
248 #[serde(default)]
250 pub is_active: bool,
251
252 #[serde(default)]
254 pub status: IdentityStatus,
255}
256
257#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
259#[serde(rename_all = "lowercase")]
260pub enum IdentityStatus {
261 #[default]
263 Active,
264 Deactivated,
266 Suspended,
268 Deleted,
270 Takendown,
272}
273
274impl std::fmt::Display for IdentityStatus {
275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276 match self {
277 IdentityStatus::Active => write!(f, "active"),
278 IdentityStatus::Deactivated => write!(f, "deactivated"),
279 IdentityStatus::Suspended => write!(f, "suspended"),
280 IdentityStatus::Deleted => write!(f, "deleted"),
281 IdentityStatus::Takendown => write!(f, "takendown"),
282 }
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289
290 #[test]
291 fn test_parse_record_event() {
292 let json = r#"{
293 "id": 12345,
294 "type": "record",
295 "record": {
296 "live": true,
297 "rev": "3lyileto4q52k",
298 "did": "did:plc:z72i7hdynmk6r22z27h6tvur",
299 "collection": "app.bsky.feed.post",
300 "rkey": "3lyiletddxt2c",
301 "action": "create",
302 "cid": "bafyreigroo6vhxt62ufcndhaxzas6btq4jmniuz4egszbwuqgiyisqwqoy",
303 "record": {"$type": "app.bsky.feed.post", "text": "Hello world!", "createdAt": "2025-01-01T00:00:00Z"}
304 }
305 }"#;
306
307 let event: TapEvent = serde_json::from_str(json).expect("Failed to parse");
308
309 match event {
310 TapEvent::Record { id, record } => {
311 assert_eq!(id, 12345);
312 assert!(record.live);
313 assert_eq!(record.rev.as_str(), "3lyileto4q52k");
314 assert_eq!(&*record.did, "did:plc:z72i7hdynmk6r22z27h6tvur");
315 assert_eq!(&*record.collection, "app.bsky.feed.post");
316 assert_eq!(record.rkey.as_str(), "3lyiletddxt2c");
317 assert_eq!(record.action, RecordAction::Create);
318 assert!(record.cid.is_some());
319 assert!(record.record.is_some());
320
321 #[derive(Deserialize)]
323 struct Post {
324 text: String,
325 }
326 let post: Post = record.parse_record().expect("Failed to parse record");
327 assert_eq!(post.text, "Hello world!");
328 }
329 _ => panic!("Expected Record event"),
330 }
331 }
332
333 #[test]
334 fn test_parse_delete_event() {
335 let json = r#"{
336 "id": 12346,
337 "type": "record",
338 "record": {
339 "live": true,
340 "rev": "3lyileto4q52k",
341 "did": "did:plc:z72i7hdynmk6r22z27h6tvur",
342 "collection": "app.bsky.feed.post",
343 "rkey": "3lyiletddxt2c",
344 "action": "delete"
345 }
346 }"#;
347
348 let event: TapEvent = serde_json::from_str(json).expect("Failed to parse");
349
350 match event {
351 TapEvent::Record { id, record } => {
352 assert_eq!(id, 12346);
353 assert_eq!(record.action, RecordAction::Delete);
354 assert!(record.is_delete());
355 assert!(record.cid.is_none());
356 assert!(record.record.is_none());
357 }
358 _ => panic!("Expected Record event"),
359 }
360 }
361
362 #[test]
363 fn test_parse_identity_event() {
364 let json = r#"{
365 "id": 12347,
366 "type": "identity",
367 "identity": {
368 "did": "did:plc:z72i7hdynmk6r22z27h6tvur",
369 "handle": "user.bsky.social",
370 "is_active": true,
371 "status": "active"
372 }
373 }"#;
374
375 let event: TapEvent = serde_json::from_str(json).expect("Failed to parse");
376
377 match event {
378 TapEvent::Identity { id, identity } => {
379 assert_eq!(id, 12347);
380 assert_eq!(&*identity.did, "did:plc:z72i7hdynmk6r22z27h6tvur");
381 assert_eq!(&*identity.handle, "user.bsky.social");
382 assert!(identity.is_active);
383 assert_eq!(identity.status, IdentityStatus::Active);
384 }
385 _ => panic!("Expected Identity event"),
386 }
387 }
388
389 #[test]
390 fn test_record_action_display() {
391 assert_eq!(RecordAction::Create.to_string(), "create");
392 assert_eq!(RecordAction::Update.to_string(), "update");
393 assert_eq!(RecordAction::Delete.to_string(), "delete");
394 }
395
396 #[test]
397 fn test_identity_status_display() {
398 assert_eq!(IdentityStatus::Active.to_string(), "active");
399 assert_eq!(IdentityStatus::Deactivated.to_string(), "deactivated");
400 assert_eq!(IdentityStatus::Suspended.to_string(), "suspended");
401 assert_eq!(IdentityStatus::Deleted.to_string(), "deleted");
402 assert_eq!(IdentityStatus::Takendown.to_string(), "takendown");
403 }
404
405 #[test]
406 fn test_at_uri() {
407 let record = RecordEvent {
408 live: true,
409 rev: "3lyileto4q52k".into(),
410 did: "did:plc:xyz".into(),
411 collection: "app.bsky.feed.post".into(),
412 rkey: "abc123".into(),
413 action: RecordAction::Create,
414 cid: None,
415 record: None,
416 };
417
418 assert_eq!(
419 record.at_uri(),
420 "at://did:plc:xyz/app.bsky.feed.post/abc123"
421 );
422 }
423
424 #[test]
425 fn test_event_id() {
426 let record_event = TapEvent::Record {
427 id: 100,
428 record: RecordEvent {
429 live: true,
430 rev: "rev".into(),
431 did: "did".into(),
432 collection: "col".into(),
433 rkey: "rkey".into(),
434 action: RecordAction::Create,
435 cid: None,
436 record: None,
437 },
438 };
439 assert_eq!(record_event.id(), 100);
440
441 let identity_event = TapEvent::Identity {
442 id: 200,
443 identity: IdentityEvent {
444 did: "did".into(),
445 handle: "handle".into(),
446 is_active: true,
447 status: IdentityStatus::Active,
448 },
449 };
450 assert_eq!(identity_event.id(), 200);
451 }
452
453 #[test]
454 fn test_extract_event_id_simple() {
455 let json = r#"{"type":"record","id":12345,"record":{"deeply":"nested"}}"#;
456 assert_eq!(extract_event_id(json), Some(12345));
457 }
458
459 #[test]
460 fn test_extract_event_id_at_end() {
461 let json = r#"{"type":"record","record":{"deeply":"nested"},"id":99999}"#;
462 assert_eq!(extract_event_id(json), Some(99999));
463 }
464
465 #[test]
466 fn test_extract_event_id_missing() {
467 let json = r#"{"type":"record","record":{"deeply":"nested"}}"#;
468 assert_eq!(extract_event_id(json), None);
469 }
470
471 #[test]
472 fn test_extract_event_id_invalid_json() {
473 let json = r#"{"type":"record","id":123"#; assert_eq!(extract_event_id(json), None);
475 }
476
477 #[test]
478 fn test_extract_event_id_deeply_nested() {
479 let mut json = String::from(r#"{"id":42,"record":{"nested":"#);
481 for _ in 0..200 {
482 json.push_str("[");
483 }
484 json.push_str("1");
485 for _ in 0..200 {
486 json.push_str("]");
487 }
488 json.push_str("}}");
489
490 assert_eq!(extract_event_id(&json), Some(42));
492 }
493}