use std::sync::Arc;
use sonic_rs;
use crate::types::{Lsn, TenantId, VShardId};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct RowId(pub Arc<str>);
impl RowId {
pub fn new(id: impl Into<Arc<str>>) -> Self {
Self(id.into())
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl std::fmt::Display for RowId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
#[derive(Debug, Clone)]
pub struct WriteEvent {
pub sequence: u64,
pub collection: Arc<str>,
pub op: WriteOp,
pub row_id: RowId,
pub lsn: Lsn,
pub tenant_id: TenantId,
pub vshard_id: VShardId,
pub source: EventSource,
pub new_value: Option<Arc<[u8]>>,
pub old_value: Option<Arc<[u8]>>,
pub system_time_ms: Option<i64>,
pub valid_time_ms: Option<i64>,
pub user_id: Option<Arc<str>>,
pub statement_digest: Option<Arc<str>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WriteOp {
Insert,
Update,
Delete,
BulkInsert { count: u32 },
BulkDelete { count: u32 },
Heartbeat,
}
impl WriteOp {
pub fn is_data_event(&self) -> bool {
!matches!(self, Self::Heartbeat)
}
}
impl std::fmt::Display for WriteOp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Insert => write!(f, "INSERT"),
Self::Update => write!(f, "UPDATE"),
Self::Delete => write!(f, "DELETE"),
Self::BulkInsert { count } => write!(f, "BULK_INSERT({count})"),
Self::BulkDelete { count } => write!(f, "BULK_DELETE({count})"),
Self::Heartbeat => write!(f, "HEARTBEAT"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EventSource {
User,
Trigger,
RaftFollower,
CrdtSync,
Deferred,
}
impl std::fmt::Display for EventSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::User => write!(f, "user"),
Self::Trigger => write!(f, "trigger"),
Self::RaftFollower => write!(f, "raft_follower"),
Self::CrdtSync => write!(f, "crdt_sync"),
Self::Deferred => write!(f, "deferred"),
}
}
}
pub fn deserialize_event_payload(
bytes: &[u8],
) -> Option<serde_json::Map<String, serde_json::Value>> {
if let Ok(serde_json::Value::Object(map)) = nodedb_types::json_from_msgpack(bytes) {
return Some(map);
}
if let Ok(serde_json::Value::Object(map)) = sonic_rs::from_slice::<serde_json::Value>(bytes) {
return Some(map);
}
None
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn row_id_display() {
let id = RowId::new("doc-123");
assert_eq!(id.as_str(), "doc-123");
assert_eq!(id.to_string(), "doc-123");
}
#[test]
fn write_op_display() {
assert_eq!(WriteOp::Insert.to_string(), "INSERT");
assert_eq!(
WriteOp::BulkInsert { count: 42 }.to_string(),
"BULK_INSERT(42)"
);
}
#[test]
fn event_source_display() {
assert_eq!(EventSource::User.to_string(), "user");
assert_eq!(EventSource::RaftFollower.to_string(), "raft_follower");
}
#[test]
fn write_event_construction() {
let event = WriteEvent {
sequence: 1,
collection: Arc::from("orders"),
op: WriteOp::Insert,
row_id: RowId::new("order-1"),
lsn: Lsn::new(100),
tenant_id: TenantId::new(1),
vshard_id: VShardId::new(0),
source: EventSource::User,
new_value: Some(Arc::from(b"payload".as_slice())),
old_value: None,
system_time_ms: None,
valid_time_ms: None,
user_id: None,
statement_digest: None,
};
assert_eq!(event.sequence, 1);
assert_eq!(event.op, WriteOp::Insert);
assert!(event.new_value.is_some());
assert!(event.old_value.is_none());
}
}