use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
use tungstenite::protocol::frame::Utf8Bytes;
use crate::Error;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
#[non_exhaustive]
pub enum RecordAction {
Create,
Update,
Delete,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
#[non_exhaustive]
pub enum AccountStatus {
Active,
Takendown,
Suspended,
Deactivated,
Deleted,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
#[non_exhaustive]
pub enum RepoState {
Pending,
Desynchronized,
Resyncing,
Active,
Takendown,
Suspended,
Deactivated,
Error,
}
pub struct RecordEvent {
pub id: u64,
pub live: bool,
pub did: String,
pub rev: String,
pub collection: String,
pub rkey: String,
pub action: RecordAction,
pub cid: Option<String>,
json: Option<Utf8Bytes>,
record_offset: usize,
record_len: usize,
}
impl RecordEvent {
pub fn record_as_str(&self) -> Option<&str> {
self.json
.as_ref()
.map(|j| &j.as_str()[self.record_offset..self.record_offset + self.record_len])
}
pub fn deserialize_as<T>(&self) -> Result<T, Error>
where
for<'de> T: Deserialize<'de>,
{
self.record_as_str()
.map_or(Err(Error::NoRecordPresent), |s| {
serde_json::from_str(s).map_err(Into::into)
})
}
}
#[derive(Debug, Clone)]
pub struct IdentityEvent {
pub id: u64,
pub did: String,
pub handle: String,
pub is_active: bool,
pub status: AccountStatus,
}
#[non_exhaustive]
pub enum Event {
Record(RecordEvent),
Identity(IdentityEvent),
}
impl Event {
pub fn id(&self) -> u64 {
match self {
Event::Record(e) => e.id,
Event::Identity(e) => e.id,
}
}
pub fn did(&self) -> &str {
match self {
Event::Record(e) => &e.did,
Event::Identity(e) => &e.did,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RepoInfo {
pub did: String,
pub handle: String,
pub state: RepoState,
pub rev: String,
pub error: String,
pub retries: u32,
pub records: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Cursors {
pub firehose: Option<i64>,
pub list_repos: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DidDocument {
pub id: String,
#[serde(default, rename = "alsoKnownAs")]
pub also_known_as: Vec<String>,
#[serde(default, rename = "verificationMethod")]
pub verification_method: Vec<VerificationMethod>,
#[serde(default)]
pub service: Vec<Service>,
#[serde(flatten)]
pub extra: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VerificationMethod {
pub id: String,
#[serde(rename = "type")]
pub type_: String,
pub controller: String,
#[serde(rename = "publicKeyMultibase")]
pub public_key_multibase: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Service {
pub id: String,
#[serde(rename = "type")]
pub type_: String,
#[serde(rename = "serviceEndpoint")]
pub service_endpoint: String,
}
#[derive(Deserialize)]
#[serde(bound(deserialize = "'de: 'a"))]
pub(crate) struct RawEvent<'a> {
pub id: u64,
#[serde(rename = "type")]
pub type_: String,
pub identity: Option<RawIdentityEvent>,
pub record: Option<RawRecordEvent<'a>>,
}
#[derive(Deserialize)]
#[serde(bound(deserialize = "'de: 'a"))]
pub(crate) struct RawRecordEvent<'a> {
pub live: bool,
pub did: String,
pub rev: String,
pub collection: String,
pub rkey: String,
pub action: RecordAction,
pub cid: Option<String>,
pub record: Option<&'a RawValue>,
}
#[derive(Deserialize, Clone)]
pub(crate) struct RawIdentityEvent {
pub did: String,
pub handle: String,
#[serde(rename = "is_active")]
pub is_active: bool,
pub status: AccountStatus,
}
impl RawEvent<'_> {
pub fn into_event(self, json: Utf8Bytes) -> Option<Event> {
match self.type_.as_str() {
"record" => {
let r = self.record?;
let (json, record_offset, record_len) = if let Some(rv) = r.record.as_ref() {
let json_str = json.as_str();
let rv_str = rv.get();
let offset = rv_str.as_ptr() as usize - json_str.as_ptr() as usize;
(Some(json), offset, rv_str.len())
} else {
(None, 0, 0)
};
Some(Event::Record(RecordEvent {
id: self.id,
live: r.live,
did: r.did,
rev: r.rev,
collection: r.collection,
rkey: r.rkey,
action: r.action,
cid: r.cid,
json,
record_offset,
record_len,
}))
}
"identity" => {
let i = self.identity?;
Some(Event::Identity(IdentityEvent {
id: self.id,
did: i.did,
handle: i.handle,
is_active: i.is_active,
status: i.status,
}))
}
_ => None,
}
}
}
#[derive(Deserialize)]
pub(crate) struct RepoCountResponse {
pub repo_count: u64,
}
#[derive(Deserialize)]
pub(crate) struct RecordCountResponse {
pub record_count: u64,
}
#[derive(Deserialize)]
pub(crate) struct OutboxBufferResponse {
pub outbox_buffer: u64,
}
#[derive(Deserialize)]
pub(crate) struct ResyncBufferResponse {
pub resync_buffer: u64,
}
#[derive(Deserialize)]
pub(crate) struct ApiError {
pub message: String,
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
macro_rules! assert_deserialize {
($type:ty, $($json:literal => $variant:expr),+ $(,)?) => {
$(
assert_eq!(
serde_json::from_str::<$type>($json).unwrap(),
$variant
);
)+
};
}
#[test]
fn record_action_deserialize() {
assert_deserialize!(RecordAction,
r#""create""# => RecordAction::Create,
r#""update""# => RecordAction::Update,
r#""delete""# => RecordAction::Delete,
);
}
#[test]
fn account_status_deserialize() {
assert_deserialize!(AccountStatus,
r#""active""# => AccountStatus::Active,
r#""takendown""# => AccountStatus::Takendown,
r#""suspended""# => AccountStatus::Suspended,
r#""deactivated""# => AccountStatus::Deactivated,
r#""deleted""# => AccountStatus::Deleted,
);
}
#[test]
fn repo_state_deserialize() {
assert_deserialize!(RepoState,
r#""pending""# => RepoState::Pending,
r#""active""# => RepoState::Active,
r#""error""# => RepoState::Error,
);
}
#[test]
fn repo_info_deserialize() {
let json = json!({
"did": "did:plc:abc123",
"handle": "test.bsky.social",
"state": "active",
"rev": "3abc123",
"error": "",
"retries": 0,
"records": 42
});
let info: RepoInfo = serde_json::from_value(json).unwrap();
assert_eq!(info.did, "did:plc:abc123");
assert_eq!(info.handle, "test.bsky.social");
assert_eq!(info.state, RepoState::Active);
assert_eq!(info.records, 42);
}
#[test]
fn cursors_deserialize() {
let json = json!({
"firehose": 12345678,
"list_repos": "some-cursor"
});
let cursors: Cursors = serde_json::from_value(json).unwrap();
assert_eq!(cursors.firehose, Some(12345678));
assert_eq!(cursors.list_repos, Some("some-cursor".to_string()));
}
#[test]
fn cursors_deserialize_nulls() {
let json = json!({
"firehose": null,
"list_repos": null
});
let cursors: Cursors = serde_json::from_value(json).unwrap();
assert_eq!(cursors.firehose, None);
assert_eq!(cursors.list_repos, None);
}
#[test]
fn did_document_deserialize() {
let json = json!({
"id": "did:plc:example1234567890abc",
"alsoKnownAs": ["at://alice.test"],
"verificationMethod": [{
"id": "did:plc:example1234567890abc#atproto",
"type": "Multikey",
"controller": "did:plc:example1234567890abc",
"publicKeyMultibase": "zDnaekeGCpVsdvDCrGNa9t3bXYUs45MHX1hLwqvaKLtPU9m7X"
}],
"service": [{
"id": "#atproto_pds",
"type": "AtprotoPersonalDataServer",
"serviceEndpoint": "https://pds.example.com"
}]
});
let doc: DidDocument = serde_json::from_value(json).unwrap();
assert_eq!(doc.id, "did:plc:example1234567890abc");
assert_eq!(doc.also_known_as, vec!["at://alice.test"]);
assert_eq!(doc.verification_method.len(), 1);
assert_eq!(doc.verification_method[0].type_, "Multikey");
assert_eq!(doc.service.len(), 1);
assert_eq!(doc.service[0].type_, "AtprotoPersonalDataServer");
}
#[test]
fn did_document_with_extra_fields() {
let json = json!({
"id": "did:plc:test",
"alsoKnownAs": [],
"@context": ["https://www.w3.org/ns/did/v1"],
"customField": "some value"
});
let doc: DidDocument = serde_json::from_value(json).unwrap();
assert_eq!(doc.id, "did:plc:test");
assert!(doc.extra.get("@context").is_some());
assert!(doc.extra.get("customField").is_some());
}
#[test]
fn raw_record_event_deserialize() {
let json = json!({
"id": 12345,
"type": "record",
"record": {
"live": true,
"did": "did:plc:abc123",
"rev": "3abc",
"collection": "app.bsky.feed.post",
"rkey": "3def",
"action": "create",
"cid": "bafyreid...",
"record": {
"$type": "app.bsky.feed.post",
"text": "Hello!"
}
}
})
.to_string();
let json: Utf8Bytes = json.into();
let raw: RawEvent = serde_json::from_str(json.as_str()).unwrap();
assert_eq!(raw.id, 12345);
assert_eq!(raw.type_, "record");
let event = raw.into_event(json.clone()).unwrap();
match event {
Event::Record(r) => {
assert_eq!(r.id, 12345);
assert!(r.live);
assert_eq!(r.did, "did:plc:abc123");
assert_eq!(r.collection, "app.bsky.feed.post");
assert_eq!(r.action, RecordAction::Create);
}
_ => panic!("Expected Record event"),
}
}
#[test]
fn raw_identity_event_deserialize() {
let json: Utf8Bytes = json!({
"id": 99999,
"type": "identity",
"identity": {
"did": "did:plc:xyz789",
"handle": "alice.bsky.social",
"is_active": true,
"status": "active"
}
})
.to_string()
.into();
let raw: RawEvent = serde_json::from_str(json.as_str()).unwrap();
let event = raw.into_event(json.clone()).unwrap();
match event {
Event::Identity(i) => {
assert_eq!(i.id, 99999);
assert_eq!(i.did, "did:plc:xyz789");
assert_eq!(i.handle, "alice.bsky.social");
assert!(i.is_active);
assert_eq!(i.status, AccountStatus::Active);
}
_ => panic!("Expected Identity event"),
}
}
#[test]
fn raw_delete_event_no_record() {
let json: Utf8Bytes = json!({
"id": 55555,
"type": "record",
"record": {
"live": false,
"did": "did:plc:deleted",
"rev": "3xyz",
"collection": "app.bsky.feed.post",
"rkey": "3abc",
"action": "delete",
"cid": null,
"record": null
}
})
.to_string()
.into();
let raw: RawEvent = serde_json::from_str(json.as_str()).unwrap();
let event = raw.into_event(json.clone()).unwrap();
match event {
Event::Record(r) => {
assert_eq!(r.action, RecordAction::Delete);
assert!(r.cid.is_none());
assert!(r.record_as_str().is_none());
}
_ => panic!("Expected Record event"),
}
}
#[test]
fn event_helper_methods() {
let record_event = Event::Record(RecordEvent {
id: 123,
live: true,
did: "did:plc:record".to_string(),
rev: "abc".to_string(),
collection: "test".to_string(),
rkey: "key".to_string(),
action: RecordAction::Create,
cid: None,
json: None,
record_offset: 0,
record_len: 0,
});
assert_eq!(record_event.id(), 123);
assert_eq!(record_event.did(), "did:plc:record");
let identity_event = Event::Identity(IdentityEvent {
id: 456,
did: "did:plc:identity".to_string(),
handle: "test".to_string(),
is_active: true,
status: AccountStatus::Active,
});
assert_eq!(identity_event.id(), 456);
assert_eq!(identity_event.did(), "did:plc:identity");
}
}