bluesky_firehose_stream/
lib.rs

1use std::{convert::Infallible, io::Cursor};
2
3//re-export atrium_api
4pub use atrium_api;
5
6use atrium_api::{
7    app::bsky::{self},
8    com::atproto::sync::subscribe_repos::{Account, Commit, Identity},
9    types::{
10        Collection as _,
11        string::{Datetime, Did, Tid},
12    },
13};
14use rs_car_sync::CarDecodeError;
15use serde::Serialize;
16use serde_ipld_dagcbor::DecodeError;
17use tracing::{error, warn};
18
19pub mod frame;
20#[cfg(feature = "websocket")]
21pub mod subscription;
22
23#[cfg(feature = "prometheus")]
24pub mod metrics;
25
26#[derive(Serialize)]
27#[serde(tag = "kind")]
28#[non_exhaustive]
29pub enum FirehoseMessage {
30    #[serde(rename = "commit")]
31    Commit {
32        did: Did,
33        rev: Tid,
34        time: Datetime,
35        operations: Vec<Operation>,
36        #[serde(skip)]
37        commit: Commit,
38    },
39    #[serde(rename = "identity")]
40    Identity(Identity),
41    #[serde(rename = "account")]
42    Account(Account),
43}
44
45impl FirehoseMessage {
46    pub fn kind(&self) -> FirehoseMessageKind {
47        match self {
48            FirehoseMessage::Commit { .. } => FirehoseMessageKind::Commit,
49            FirehoseMessage::Identity(_object) => FirehoseMessageKind::Identity,
50            FirehoseMessage::Account(_object) => FirehoseMessageKind::Account,
51        }
52    }
53}
54#[derive(Serialize, Debug, Clone, Copy)]
55#[serde(rename_all = "lowercase")]
56pub enum FirehoseMessageKind {
57    Commit,
58    Identity,
59    Account,
60}
61impl FirehoseMessageKind {
62    pub fn as_str(&self) -> &str {
63        match self {
64            FirehoseMessageKind::Commit => "commit",
65            FirehoseMessageKind::Identity => "identity",
66            FirehoseMessageKind::Account => "account",
67        }
68    }
69}
70#[derive(Serialize, Debug)]
71#[serde(untagged)]
72#[non_exhaustive]
73pub enum Record {
74    Unknown(ipld_core::ipld::Ipld),
75    Post(atrium_api::types::Object<bsky::feed::post::RecordData>),
76    Follow(atrium_api::types::Object<bsky::graph::follow::RecordData>),
77    Block(atrium_api::types::Object<bsky::graph::block::RecordData>),
78    Repost(atrium_api::types::Object<bsky::feed::repost::RecordData>),
79    Like(atrium_api::types::Object<bsky::feed::like::RecordData>),
80    Listitem(atrium_api::types::Object<bsky::graph::listitem::RecordData>),
81    Generator(atrium_api::types::Object<bsky::feed::generator::RecordData>),
82    Profile(atrium_api::types::Object<bsky::actor::profile::RecordData>),
83    List(atrium_api::types::Object<bsky::graph::list::RecordData>),
84    Starterpack(atrium_api::types::Object<bsky::graph::starterpack::RecordData>),
85}
86
87#[derive(Serialize)]
88#[serde(tag = "operation", rename_all = "lowercase")]
89pub enum Operation {
90    Create {
91        #[serde(flatten)]
92        operation_meta: OperationMeta,
93        record: Record,
94        cid: String,
95    },
96    Update {
97        #[serde(flatten)]
98        operation_meta: OperationMeta,
99        record: Record,
100        cid: String,
101    },
102    Delete(OperationMeta),
103}
104impl Operation {
105    pub fn kind(&self) -> OperationKind {
106        match self {
107            Operation::Create { .. } => OperationKind::Create,
108            Operation::Update { .. } => OperationKind::Update,
109            Operation::Delete(_) => OperationKind::Delete,
110        }
111    }
112    pub fn operation_meta(&self) -> &OperationMeta {
113        match self {
114            Operation::Create {
115                operation_meta,
116                record: _,
117                cid: _,
118            } => operation_meta,
119            Operation::Update {
120                operation_meta,
121                record: _,
122                cid: _,
123            } => operation_meta,
124            Operation::Delete(operation_meta) => operation_meta,
125        }
126    }
127}
128#[derive(Debug, Clone, Copy)]
129pub enum OperationKind {
130    Create,
131    Update,
132    Delete,
133}
134impl OperationKind {
135    pub fn as_str(&self) -> &str {
136        match self {
137            OperationKind::Create => "create",
138            OperationKind::Update => "update",
139            OperationKind::Delete => "delete",
140        }
141    }
142}
143#[derive(Serialize, Debug)]
144pub struct OperationMeta {
145    pub collection: String,
146    pub rkey: String,
147}
148#[derive(thiserror::Error, Debug)]
149pub enum Error {
150    #[error("Unknown frame type {0}")]
151    UnknownFrameType(String, crate::frame::MessageFrame),
152    #[error("No type in frame")]
153    NoTypeInFrame(crate::frame::MessageFrame),
154    #[error("Error Frame")]
155    FrameError(crate::frame::ErrorFrame),
156    #[error("Frame decode error {0}")]
157    DagCborDecodeError(DecodeError<Infallible>, crate::frame::MessageFrame),
158    #[error("CAR decode error {0}")]
159    CarDecodeError(CarDecodeError, Commit),
160    #[error("No block found for commit {did:?} {0} {operation} {path}", rev.as_str())]
161    NoBlockForCommit {
162        operation: String,
163        rev: Tid,
164        did: Did,
165        path: String,
166    },
167    #[error("Unknown commit operation `{operation}` {}/{}", operation_meta.collection, operation_meta.rkey)]
168    UnknownCommitOperation {
169        operation: String,
170        operation_meta: OperationMeta,
171        record: Record,
172        cid: String,
173    },
174}
175
176impl TryFrom<crate::frame::Frame> for FirehoseMessage {
177    type Error = Error;
178
179    fn try_from(frame: crate::frame::Frame) -> Result<Self, Self::Error> {
180        match frame {
181            crate::frame::Frame::Message(Some(t), message_frame) => match t.as_str() {
182                "#commit" => {
183                    let commit =
184                        serde_ipld_dagcbor::from_slice::<Commit>(message_frame.body.as_slice())
185                            .map_err(|e| Error::DagCborDecodeError(e, message_frame.clone()))?;
186
187                    let mut block_reader = Cursor::new(&commit.blocks);
188                    let (blocks, _) = rs_car_sync::car_read_all(&mut block_reader, true)
189                        .map_err(|e| Error::CarDecodeError(e, commit.clone()))?;
190
191                    let mut operations = Vec::new();
192
193                    for op in &commit.ops {
194                        let (nsid, rkey) = {
195                            let mut split = op.path.split("/");
196                            (split.next().unwrap(), split.next())
197                        };
198                        if op.action == "delete" {
199                            operations.push(Operation::Delete(OperationMeta {
200                                collection: nsid.to_string(),
201                                rkey: rkey.unwrap_or_default().to_string(),
202                            }));
203                            continue;
204                        }
205                        let Some(op_cid_acid) = &op.cid else {
206                            if op.action != "delete" {
207                                warn!("No block cid for op {} {}", op.action, op.path);
208                            } else {
209                            }
210                            continue;
211                        };
212                        let op_cid = op_cid_acid.0;
213
214                        let record = match blocks.iter().find(|(cid, _data)| cid == &op_cid) {
215                            Some(block) => match nsid {
216                                bsky::feed::Post::NSID => {
217                                    Record::Post(serde_ipld_dagcbor::from_slice(&block.1).map_err(
218                                        |e| Error::DagCborDecodeError(e, message_frame.clone()),
219                                    )?)
220                                }
221                                bsky::graph::Follow::NSID => Record::Follow(
222                                    serde_ipld_dagcbor::from_slice(&block.1).map_err(|e| {
223                                        Error::DagCborDecodeError(e, message_frame.clone())
224                                    })?,
225                                ),
226                                bsky::graph::Block::NSID => Record::Block(
227                                    serde_ipld_dagcbor::from_slice(&block.1).map_err(|e| {
228                                        Error::DagCborDecodeError(e, message_frame.clone())
229                                    })?,
230                                ),
231                                bsky::feed::Repost::NSID => Record::Repost(
232                                    serde_ipld_dagcbor::from_slice(&block.1).map_err(|e| {
233                                        Error::DagCborDecodeError(e, message_frame.clone())
234                                    })?,
235                                ),
236                                bsky::feed::Like::NSID => {
237                                    Record::Like(serde_ipld_dagcbor::from_slice(&block.1).map_err(
238                                        |e| Error::DagCborDecodeError(e, message_frame.clone()),
239                                    )?)
240                                }
241                                bsky::graph::Listitem::NSID => Record::Listitem(
242                                    serde_ipld_dagcbor::from_slice(&block.1).map_err(|e| {
243                                        Error::DagCborDecodeError(e, message_frame.clone())
244                                    })?,
245                                ),
246                                bsky::feed::Generator::NSID => Record::Generator(
247                                    serde_ipld_dagcbor::from_slice(&block.1).map_err(|e| {
248                                        Error::DagCborDecodeError(e, message_frame.clone())
249                                    })?,
250                                ),
251                                bsky::actor::Profile::NSID => Record::Profile(
252                                    serde_ipld_dagcbor::from_slice(&block.1).map_err(|e| {
253                                        Error::DagCborDecodeError(e, message_frame.clone())
254                                    })?,
255                                ),
256                                bsky::graph::List::NSID => {
257                                    Record::List(serde_ipld_dagcbor::from_slice(&block.1).map_err(
258                                        |e| Error::DagCborDecodeError(e, message_frame.clone()),
259                                    )?)
260                                }
261                                bsky::graph::Starterpack::NSID => Record::Starterpack(
262                                    serde_ipld_dagcbor::from_slice(&block.1).map_err(|e| {
263                                        Error::DagCborDecodeError(e, message_frame.clone())
264                                    })?,
265                                ),
266
267                                _ => Record::Unknown(
268                                    serde_ipld_dagcbor::from_slice::<ipld_core::ipld::Ipld>(
269                                        &block.1,
270                                    )
271                                    .map_err(|e| {
272                                        Error::DagCborDecodeError(e, message_frame.clone())
273                                    })?,
274                                ),
275                            },
276                            None => Err(Error::NoBlockForCommit {
277                                operation: op.action.clone(),
278                                rev: commit.rev.clone(),
279                                did: commit.repo.clone(),
280                                path: op.path.clone(),
281                            })?,
282                        };
283                        let operation = match op.action.as_str() {
284                            "create" => Operation::Create {
285                                operation_meta: OperationMeta {
286                                    collection: nsid.to_string(),
287                                    rkey: rkey.unwrap_or_default().to_string(),
288                                },
289                                record,
290                                cid: op_cid.to_string(),
291                            },
292                            "update" => Operation::Update {
293                                operation_meta: OperationMeta {
294                                    collection: nsid.to_string(),
295                                    rkey: rkey.unwrap_or_default().to_string(),
296                                },
297                                record,
298                                cid: op_cid.to_string(),
299                            },
300                            other => Err(Error::UnknownCommitOperation {
301                                operation: other.to_string(),
302                                operation_meta: OperationMeta {
303                                    collection: nsid.to_string(),
304                                    rkey: rkey.unwrap_or_default().to_string(),
305                                },
306                                record,
307                                cid: op_cid.to_string(),
308                            })?,
309                        };
310                        operations.push(operation);
311                    }
312                    Ok(FirehoseMessage::Commit {
313                        operations,
314                        rev: commit.rev.clone(),
315                        time: commit.time.clone(),
316                        did: commit.repo.clone(),
317                        commit,
318                    })
319                }
320                "#account" => Ok(FirehoseMessage::Account(
321                    serde_ipld_dagcbor::from_slice(message_frame.body.as_slice())
322                        .map_err(|e| Error::DagCborDecodeError(e, message_frame))?,
323                )),
324
325                "#identity" => Ok(FirehoseMessage::Identity(
326                    serde_ipld_dagcbor::from_slice(message_frame.body.as_slice())
327                        .map_err(|e| Error::DagCborDecodeError(e, message_frame))?,
328                )),
329                t => Err(Error::UnknownFrameType(t.to_string(), message_frame))?,
330            },
331            crate::frame::Frame::Message(None, message_frame) => {
332                Err(Error::NoTypeInFrame(message_frame))
333            }
334            crate::frame::Frame::Error(error_frame) => Err(Error::FrameError(error_frame)),
335        }
336    }
337}