use chrono::{DateTime, Utc};
use serde_json::Value;
use crate::identity::SyncKey;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SourceSystem {
Salesforce,
Postgres,
}
impl SourceSystem {
#[must_use]
pub const fn as_db_value(self) -> &'static str {
match self {
Self::Salesforce => "salesforce",
Self::Postgres => "postgres",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChangeOperation {
Upsert,
Delete,
}
impl ChangeOperation {
#[must_use]
pub const fn as_db_value(self) -> &'static str {
match self {
Self::Upsert => "upsert",
Self::Delete => "delete",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SourceCursor {
SalesforceReplayId(i64),
PostgresLsn(String),
Snapshot(String),
}
impl SourceCursor {
#[must_use]
pub fn as_db_value(&self) -> String {
match self {
Self::SalesforceReplayId(replay_id) => format!("salesforce-replay-id:{replay_id}"),
Self::PostgresLsn(lsn) => format!("postgres-lsn:{lsn}"),
Self::Snapshot(watermark) => format!("snapshot:{watermark}"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChangeEnvelope {
sync_key: SyncKey,
source: SourceSystem,
operation: ChangeOperation,
cursor: Option<SourceCursor>,
observed_at: DateTime<Utc>,
payload: Value,
}
impl ChangeEnvelope {
#[must_use]
pub const fn new(
sync_key: SyncKey,
source: SourceSystem,
operation: ChangeOperation,
observed_at: DateTime<Utc>,
payload: Value,
) -> Self {
Self {
sync_key,
source,
operation,
cursor: None,
observed_at,
payload,
}
}
#[must_use]
pub fn with_cursor(mut self, cursor: SourceCursor) -> Self {
self.cursor = Some(cursor);
self
}
#[must_use]
pub const fn sync_key(&self) -> &SyncKey {
&self.sync_key
}
#[must_use]
pub const fn source(&self) -> SourceSystem {
self.source
}
#[must_use]
pub const fn operation(&self) -> ChangeOperation {
self.operation
}
#[must_use]
pub const fn cursor(&self) -> Option<&SourceCursor> {
self.cursor.as_ref()
}
#[must_use]
pub const fn observed_at(&self) -> DateTime<Utc> {
self.observed_at
}
#[must_use]
pub const fn payload(&self) -> &Value {
&self.payload
}
#[must_use]
pub fn payload_hash(&self) -> [u8; 32] {
payload_hash(&self.payload)
}
#[must_use]
pub fn payload_hash_matches(&self, other: &Value) -> bool {
self.payload_hash() == payload_hash(other)
}
}
#[must_use]
pub fn payload_hash(payload: &Value) -> [u8; 32] {
let mut canonical_payload = payload.clone();
canonical_payload.sort_all_objects();
*blake3::hash(canonical_payload.to_string().as_bytes()).as_bytes()
}
#[cfg(test)]
mod tests {
use chrono::Utc;
use serde_json::json;
use crate::identity::SyncKey;
use super::{ChangeEnvelope, ChangeOperation, SourceCursor, SourceSystem};
#[test]
fn change_envelope_payload_hash_is_stable_for_identical_payloads() {
let sync_key = match SyncKey::new("tenant", "Account", "abc") {
Ok(sync_key) => sync_key,
Err(error) => panic!("unexpected sync key construction error: {error}"),
};
let payload = json!({"Name": "Acme"});
let first = ChangeEnvelope::new(
sync_key.clone(),
SourceSystem::Salesforce,
ChangeOperation::Upsert,
Utc::now(),
payload.clone(),
);
let second = ChangeEnvelope::new(
sync_key,
SourceSystem::Salesforce,
ChangeOperation::Upsert,
Utc::now(),
payload,
);
assert_eq!(first.payload_hash(), second.payload_hash());
}
#[test]
fn payload_hash_matches_pre_sorted_payloads() {
let payload = json!({
"outer": {
"zeta": 1,
"alpha": 2
},
"items": [
{
"delta": 4,
"beta": 3
}
]
});
let mut sorted_payload = payload.clone();
sorted_payload.sort_all_objects();
assert_eq!(
super::payload_hash(&payload),
super::payload_hash(&sorted_payload)
);
}
#[test]
fn change_envelope_with_cursor_attaches_cursor() {
let sync_key = match SyncKey::new("tenant", "Account", "abc") {
Ok(sync_key) => sync_key,
Err(error) => panic!("unexpected sync key construction error: {error}"),
};
let envelope = ChangeEnvelope::new(
sync_key,
SourceSystem::Salesforce,
ChangeOperation::Upsert,
Utc::now(),
json!({"Name": "Acme"}),
)
.with_cursor(SourceCursor::SalesforceReplayId(42));
assert!(matches!(
envelope.cursor(),
Some(SourceCursor::SalesforceReplayId(42))
));
}
#[test]
fn change_envelope_payload_hash_matches_semantically_equal_payloads() {
let sync_key = match SyncKey::new("tenant", "Account", "abc") {
Ok(sync_key) => sync_key,
Err(error) => panic!("unexpected sync key construction error: {error}"),
};
let envelope = ChangeEnvelope::new(
sync_key,
SourceSystem::Salesforce,
ChangeOperation::Upsert,
Utc::now(),
json!({
"Name": "Acme",
"Description": "Keep"
}),
);
assert!(envelope.payload_hash_matches(&json!({
"Description": "Keep",
"Name": "Acme"
})));
}
}