use chrono::{DateTime, Utc};
use serde_json::Value;
use crate::identity::SyncKey;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SourceSystem {
Salesforce,
Postgres,
}
impl SourceSystem {
#[must_use]
pub const fn as_db_value(self) -> &'static str {
match self {
Self::Salesforce => "salesforce",
Self::Postgres => "postgres",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChangeOperation {
Upsert,
Delete,
}
impl ChangeOperation {
#[must_use]
pub const fn as_db_value(self) -> &'static str {
match self {
Self::Upsert => "upsert",
Self::Delete => "delete",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SourceCursor {
SalesforceReplayId(i64),
PostgresLsn(String),
Snapshot(String),
}
impl SourceCursor {
#[must_use]
pub fn as_db_value(&self) -> String {
use std::fmt::Write;
match self {
Self::SalesforceReplayId(replay_id) => {
let mut s = String::with_capacity(22 + 20); let _ = write!(s, "salesforce-replay-id:{replay_id}");
s
}
Self::PostgresLsn(lsn) => {
let mut s = String::with_capacity(13 + lsn.len());
s.push_str("postgres-lsn:");
s.push_str(lsn);
s
}
Self::Snapshot(watermark) => {
let mut s = String::with_capacity(9 + watermark.len());
s.push_str("snapshot:");
s.push_str(watermark);
s
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChangeEnvelope {
sync_key: SyncKey,
source: SourceSystem,
operation: ChangeOperation,
cursor: Option<SourceCursor>,
observed_at: DateTime<Utc>,
payload: Value,
}
impl ChangeEnvelope {
#[must_use]
pub const fn new(
sync_key: SyncKey,
source: SourceSystem,
operation: ChangeOperation,
observed_at: DateTime<Utc>,
payload: Value,
) -> Self {
Self {
sync_key,
source,
operation,
cursor: None,
observed_at,
payload,
}
}
#[must_use]
pub fn with_cursor(mut self, cursor: SourceCursor) -> Self {
self.cursor = Some(cursor);
self
}
#[must_use]
pub const fn sync_key(&self) -> &SyncKey {
&self.sync_key
}
#[must_use]
pub const fn source(&self) -> SourceSystem {
self.source
}
#[must_use]
pub const fn operation(&self) -> ChangeOperation {
self.operation
}
#[must_use]
pub const fn cursor(&self) -> Option<&SourceCursor> {
self.cursor.as_ref()
}
#[must_use]
pub const fn observed_at(&self) -> DateTime<Utc> {
self.observed_at
}
#[must_use]
pub const fn payload(&self) -> &Value {
&self.payload
}
#[must_use]
pub fn payload_hash(&self) -> [u8; 32] {
payload_hash(&self.payload)
}
#[must_use]
pub fn payload_hash_matches(&self, other: &Value) -> bool {
self.payload_hash() == payload_hash(other)
}
}
#[must_use]
pub fn payload_hash(payload: &Value) -> [u8; 32] {
let mut hasher = blake3::Hasher::new();
hash_json_value(payload, &mut hasher);
*hasher.finalize().as_bytes()
}
fn hash_json_value(value: &Value, hasher: &mut blake3::Hasher) {
use std::io::Write;
match value {
Value::Object(map) => {
let _ = Write::write_all(hasher, b"{");
let mut iter = Vec::with_capacity(map.len());
iter.extend(map.iter());
iter.sort_unstable_by_key(|(k, _)| *k);
let mut first = true;
for (k, v) in iter {
if !first {
let _ = Write::write_all(hasher, b",");
}
first = false;
let _ = serde_json::to_writer(&mut *hasher, k);
let _ = Write::write_all(hasher, b":");
hash_json_value(v, hasher);
}
let _ = Write::write_all(hasher, b"}");
}
Value::Array(arr) => {
let _ = Write::write_all(hasher, b"[");
let mut first = true;
for v in arr {
if !first {
let _ = Write::write_all(hasher, b",");
}
first = false;
hash_json_value(v, hasher);
}
let _ = Write::write_all(hasher, b"]");
}
_ => {
let _ = serde_json::to_writer(hasher, value);
}
}
}
#[cfg(test)]
mod tests {
use chrono::Utc;
use serde_json::json;
use crate::identity::SyncKey;
use super::{ChangeEnvelope, ChangeOperation, SourceCursor, SourceSystem};
#[test]
fn change_envelope_payload_hash_is_stable_for_identical_payloads() {
let sync_key = match SyncKey::new("tenant", "Account", "abc") {
Ok(sync_key) => sync_key,
Err(error) => panic!("unexpected sync key construction error: {error}"),
};
let payload = json!({"Name": "Acme"});
let first = ChangeEnvelope::new(
sync_key.clone(),
SourceSystem::Salesforce,
ChangeOperation::Upsert,
Utc::now(),
payload.clone(),
);
let second = ChangeEnvelope::new(
sync_key,
SourceSystem::Salesforce,
ChangeOperation::Upsert,
Utc::now(),
payload,
);
assert_eq!(first.payload_hash(), second.payload_hash());
}
#[test]
fn payload_hash_matches_pre_sorted_payloads() {
let payload = json!({
"outer": {
"zeta": 1,
"alpha": 2
},
"items": [
{
"delta": 4,
"beta": 3
}
]
});
let mut sorted_payload = payload.clone();
sorted_payload.sort_all_objects();
assert_eq!(
super::payload_hash(&payload),
super::payload_hash(&sorted_payload)
);
}
#[test]
fn change_envelope_with_cursor_attaches_cursor() {
let sync_key = match SyncKey::new("tenant", "Account", "abc") {
Ok(sync_key) => sync_key,
Err(error) => panic!("unexpected sync key construction error: {error}"),
};
let envelope = ChangeEnvelope::new(
sync_key,
SourceSystem::Salesforce,
ChangeOperation::Upsert,
Utc::now(),
json!({"Name": "Acme"}),
)
.with_cursor(SourceCursor::SalesforceReplayId(42));
assert!(matches!(
envelope.cursor(),
Some(SourceCursor::SalesforceReplayId(42))
));
}
#[test]
fn change_envelope_payload_hash_matches_semantically_equal_payloads() {
let sync_key = match SyncKey::new("tenant", "Account", "abc") {
Ok(sync_key) => sync_key,
Err(error) => panic!("unexpected sync key construction error: {error}"),
};
let envelope = ChangeEnvelope::new(
sync_key,
SourceSystem::Salesforce,
ChangeOperation::Upsert,
Utc::now(),
json!({
"Name": "Acme",
"Description": "Keep"
}),
);
assert!(envelope.payload_hash_matches(&json!({
"Description": "Keep",
"Name": "Acme"
})));
}
}