1use std::{convert::Infallible, io::Cursor};
2
3pub use atrium_api;
5
6use atrium_api::{
7 app::bsky::{self},
8 com::atproto::sync::subscribe_repos::{Account, Commit, Identity},
9 types::{
10 Collection as _,
11 string::{Datetime, Did, Tid},
12 },
13};
14use rs_car_sync::CarDecodeError;
15use serde::Serialize;
16use serde_ipld_dagcbor::DecodeError;
17use tracing::{error, warn};
18
19pub mod frame;
20#[cfg(feature = "websocket")]
21pub mod subscription;
22
23#[cfg(feature = "prometheus")]
24pub mod metrics;
25
26#[derive(Serialize)]
27#[serde(tag = "kind")]
28#[non_exhaustive]
29pub enum FirehoseMessage {
30 #[serde(rename = "commit")]
31 Commit {
32 did: Did,
33 rev: Tid,
34 time: Datetime,
35 operations: Vec<Operation>,
36 #[serde(skip)]
37 commit: Commit,
38 },
39 #[serde(rename = "identity")]
40 Identity(Identity),
41 #[serde(rename = "account")]
42 Account(Account),
43}
44
45impl FirehoseMessage {
46 pub fn kind(&self) -> FirehoseMessageKind {
47 match self {
48 FirehoseMessage::Commit { .. } => FirehoseMessageKind::Commit,
49 FirehoseMessage::Identity(_object) => FirehoseMessageKind::Identity,
50 FirehoseMessage::Account(_object) => FirehoseMessageKind::Account,
51 }
52 }
53}
54#[derive(Serialize, Debug, Clone, Copy)]
55#[serde(rename_all = "lowercase")]
56pub enum FirehoseMessageKind {
57 Commit,
58 Identity,
59 Account,
60}
61impl FirehoseMessageKind {
62 pub fn as_str(&self) -> &str {
63 match self {
64 FirehoseMessageKind::Commit => "commit",
65 FirehoseMessageKind::Identity => "identity",
66 FirehoseMessageKind::Account => "account",
67 }
68 }
69}
70#[derive(Serialize, Debug)]
71#[serde(untagged)]
72#[non_exhaustive]
73pub enum Record {
74 Unknown(ipld_core::ipld::Ipld),
75 Post(atrium_api::types::Object<bsky::feed::post::RecordData>),
76 Follow(atrium_api::types::Object<bsky::graph::follow::RecordData>),
77 Block(atrium_api::types::Object<bsky::graph::block::RecordData>),
78 Repost(atrium_api::types::Object<bsky::feed::repost::RecordData>),
79 Like(atrium_api::types::Object<bsky::feed::like::RecordData>),
80 Listitem(atrium_api::types::Object<bsky::graph::listitem::RecordData>),
81 Generator(atrium_api::types::Object<bsky::feed::generator::RecordData>),
82 Profile(atrium_api::types::Object<bsky::actor::profile::RecordData>),
83 List(atrium_api::types::Object<bsky::graph::list::RecordData>),
84 Starterpack(atrium_api::types::Object<bsky::graph::starterpack::RecordData>),
85}
86
87#[derive(Serialize)]
88#[serde(tag = "operation", rename_all = "lowercase")]
89pub enum Operation {
90 Create {
91 #[serde(flatten)]
92 operation_meta: OperationMeta,
93 record: Record,
94 cid: String,
95 },
96 Update {
97 #[serde(flatten)]
98 operation_meta: OperationMeta,
99 record: Record,
100 cid: String,
101 },
102 Delete(OperationMeta),
103}
104impl Operation {
105 pub fn kind(&self) -> OperationKind {
106 match self {
107 Operation::Create { .. } => OperationKind::Create,
108 Operation::Update { .. } => OperationKind::Update,
109 Operation::Delete(_) => OperationKind::Delete,
110 }
111 }
112 pub fn operation_meta(&self) -> &OperationMeta {
113 match self {
114 Operation::Create {
115 operation_meta,
116 record: _,
117 cid: _,
118 } => operation_meta,
119 Operation::Update {
120 operation_meta,
121 record: _,
122 cid: _,
123 } => operation_meta,
124 Operation::Delete(operation_meta) => operation_meta,
125 }
126 }
127}
128#[derive(Debug, Clone, Copy)]
129pub enum OperationKind {
130 Create,
131 Update,
132 Delete,
133}
134impl OperationKind {
135 pub fn as_str(&self) -> &str {
136 match self {
137 OperationKind::Create => "create",
138 OperationKind::Update => "update",
139 OperationKind::Delete => "delete",
140 }
141 }
142}
143#[derive(Serialize, Debug)]
144pub struct OperationMeta {
145 pub collection: String,
146 pub rkey: String,
147}
148#[derive(thiserror::Error, Debug)]
149pub enum Error {
150 #[error("Unknown frame type {0}")]
151 UnknownFrameType(String, crate::frame::MessageFrame),
152 #[error("No type in frame")]
153 NoTypeInFrame(crate::frame::MessageFrame),
154 #[error("Error Frame")]
155 FrameError(crate::frame::ErrorFrame),
156 #[error("Frame decode error {0}")]
157 DagCborDecodeError(DecodeError<Infallible>, crate::frame::MessageFrame),
158 #[error("CAR decode error {0}")]
159 CarDecodeError(CarDecodeError, Commit),
160 #[error("No block found for commit {did:?} {0} {operation} {path}", rev.as_str())]
161 NoBlockForCommit {
162 operation: String,
163 rev: Tid,
164 did: Did,
165 path: String,
166 },
167 #[error("Unknown commit operation `{operation}` {}/{}", operation_meta.collection, operation_meta.rkey)]
168 UnknownCommitOperation {
169 operation: String,
170 operation_meta: OperationMeta,
171 record: Record,
172 cid: String,
173 },
174}
175
176impl TryFrom<crate::frame::Frame> for FirehoseMessage {
177 type Error = Error;
178
179 fn try_from(frame: crate::frame::Frame) -> Result<Self, Self::Error> {
180 match frame {
181 crate::frame::Frame::Message(Some(t), message_frame) => match t.as_str() {
182 "#commit" => {
183 let commit =
184 serde_ipld_dagcbor::from_slice::<Commit>(message_frame.body.as_slice())
185 .map_err(|e| Error::DagCborDecodeError(e, message_frame.clone()))?;
186
187 let mut block_reader = Cursor::new(&commit.blocks);
188 let (blocks, _) = rs_car_sync::car_read_all(&mut block_reader, true)
189 .map_err(|e| Error::CarDecodeError(e, commit.clone()))?;
190
191 let mut operations = Vec::new();
192
193 for op in &commit.ops {
194 let (nsid, rkey) = {
195 let mut split = op.path.split("/");
196 (split.next().unwrap(), split.next())
197 };
198 if op.action == "delete" {
199 operations.push(Operation::Delete(OperationMeta {
200 collection: nsid.to_string(),
201 rkey: rkey.unwrap_or_default().to_string(),
202 }));
203 continue;
204 }
205 let Some(op_cid_acid) = &op.cid else {
206 if op.action != "delete" {
207 warn!("No block cid for op {} {}", op.action, op.path);
208 } else {
209 }
210 continue;
211 };
212 let op_cid = op_cid_acid.0;
213
214 let record = match blocks.iter().find(|(cid, _data)| cid == &op_cid) {
215 Some(block) => match nsid {
216 bsky::feed::Post::NSID => {
217 Record::Post(serde_ipld_dagcbor::from_slice(&block.1).map_err(
218 |e| Error::DagCborDecodeError(e, message_frame.clone()),
219 )?)
220 }
221 bsky::graph::Follow::NSID => Record::Follow(
222 serde_ipld_dagcbor::from_slice(&block.1).map_err(|e| {
223 Error::DagCborDecodeError(e, message_frame.clone())
224 })?,
225 ),
226 bsky::graph::Block::NSID => Record::Block(
227 serde_ipld_dagcbor::from_slice(&block.1).map_err(|e| {
228 Error::DagCborDecodeError(e, message_frame.clone())
229 })?,
230 ),
231 bsky::feed::Repost::NSID => Record::Repost(
232 serde_ipld_dagcbor::from_slice(&block.1).map_err(|e| {
233 Error::DagCborDecodeError(e, message_frame.clone())
234 })?,
235 ),
236 bsky::feed::Like::NSID => {
237 Record::Like(serde_ipld_dagcbor::from_slice(&block.1).map_err(
238 |e| Error::DagCborDecodeError(e, message_frame.clone()),
239 )?)
240 }
241 bsky::graph::Listitem::NSID => Record::Listitem(
242 serde_ipld_dagcbor::from_slice(&block.1).map_err(|e| {
243 Error::DagCborDecodeError(e, message_frame.clone())
244 })?,
245 ),
246 bsky::feed::Generator::NSID => Record::Generator(
247 serde_ipld_dagcbor::from_slice(&block.1).map_err(|e| {
248 Error::DagCborDecodeError(e, message_frame.clone())
249 })?,
250 ),
251 bsky::actor::Profile::NSID => Record::Profile(
252 serde_ipld_dagcbor::from_slice(&block.1).map_err(|e| {
253 Error::DagCborDecodeError(e, message_frame.clone())
254 })?,
255 ),
256 bsky::graph::List::NSID => {
257 Record::List(serde_ipld_dagcbor::from_slice(&block.1).map_err(
258 |e| Error::DagCborDecodeError(e, message_frame.clone()),
259 )?)
260 }
261 bsky::graph::Starterpack::NSID => Record::Starterpack(
262 serde_ipld_dagcbor::from_slice(&block.1).map_err(|e| {
263 Error::DagCborDecodeError(e, message_frame.clone())
264 })?,
265 ),
266
267 _ => Record::Unknown(
268 serde_ipld_dagcbor::from_slice::<ipld_core::ipld::Ipld>(
269 &block.1,
270 )
271 .map_err(|e| {
272 Error::DagCborDecodeError(e, message_frame.clone())
273 })?,
274 ),
275 },
276 None => Err(Error::NoBlockForCommit {
277 operation: op.action.clone(),
278 rev: commit.rev.clone(),
279 did: commit.repo.clone(),
280 path: op.path.clone(),
281 })?,
282 };
283 let operation = match op.action.as_str() {
284 "create" => Operation::Create {
285 operation_meta: OperationMeta {
286 collection: nsid.to_string(),
287 rkey: rkey.unwrap_or_default().to_string(),
288 },
289 record,
290 cid: op_cid.to_string(),
291 },
292 "update" => Operation::Update {
293 operation_meta: OperationMeta {
294 collection: nsid.to_string(),
295 rkey: rkey.unwrap_or_default().to_string(),
296 },
297 record,
298 cid: op_cid.to_string(),
299 },
300 other => Err(Error::UnknownCommitOperation {
301 operation: other.to_string(),
302 operation_meta: OperationMeta {
303 collection: nsid.to_string(),
304 rkey: rkey.unwrap_or_default().to_string(),
305 },
306 record,
307 cid: op_cid.to_string(),
308 })?,
309 };
310 operations.push(operation);
311 }
312 Ok(FirehoseMessage::Commit {
313 operations,
314 rev: commit.rev.clone(),
315 time: commit.time.clone(),
316 did: commit.repo.clone(),
317 commit,
318 })
319 }
320 "#account" => Ok(FirehoseMessage::Account(
321 serde_ipld_dagcbor::from_slice(message_frame.body.as_slice())
322 .map_err(|e| Error::DagCborDecodeError(e, message_frame))?,
323 )),
324
325 "#identity" => Ok(FirehoseMessage::Identity(
326 serde_ipld_dagcbor::from_slice(message_frame.body.as_slice())
327 .map_err(|e| Error::DagCborDecodeError(e, message_frame))?,
328 )),
329 t => Err(Error::UnknownFrameType(t.to_string(), message_frame))?,
330 },
331 crate::frame::Frame::Message(None, message_frame) => {
332 Err(Error::NoTypeInFrame(message_frame))
333 }
334 crate::frame::Frame::Error(error_frame) => Err(Error::FrameError(error_frame)),
335 }
336 }
337}