use super::sync_types::*;
use crate::error::Result;
pub struct SyncClient {
base_url: String,
node_id: String,
http: reqwest::Client,
}
impl SyncClient {
pub fn new(base_url: impl Into<String>, node_id: impl Into<String>) -> Self {
Self {
base_url: base_url.into().trim_end_matches('/').to_string(),
node_id: node_id.into(),
http: reqwest::Client::new(),
}
}
pub async fn sync(&self, core: &super::core::EmbeddedCore) -> Result<SyncStats> {
let mut stats = SyncStats::default();
let pull_stats = self.pull(core).await?;
stats.pulled = pull_stats.pulled;
stats.conflicts += pull_stats.conflicts;
let push_stats = self.push(core).await?;
stats.pushed = push_stats.pushed;
stats.conflicts += push_stats.conflicts;
Ok(stats)
}
async fn pull(&self, core: &super::core::EmbeddedCore) -> Result<SyncStats> {
let vv = core.version_vector();
let request = SyncPullRequest {
node_id: self.node_id.clone(),
version_vector: vv,
};
let url = format!("{}/api/v1/sync/pull", self.base_url);
let response: SyncPullResponse = self
.http
.post(&url)
.json(&request)
.send()
.await
.map_err(|e| {
crate::error::AllSourceError::InvalidInput(format!("sync pull HTTP failed: {e}"))
})?
.json()
.await
.map_err(|e| {
crate::error::AllSourceError::InvalidInput(format!(
"sync pull response parse failed: {e}"
))
})?;
let total = response.events.len();
let (accepted, skipped) = core.receive_sync_push(response.events).await?;
Ok(SyncStats {
pulled: accepted,
pushed: 0,
conflicts: skipped,
})
}
async fn push(&self, core: &super::core::EmbeddedCore) -> Result<SyncStats> {
let server_vv = self.fetch_server_version_vector().await?;
let events = core.events_for_sync(&server_vv).await?;
if events.is_empty() {
return Ok(SyncStats::default());
}
let request = SyncPushRequest {
node_id: self.node_id.clone(),
events,
version_vector: core.version_vector(),
};
let url = format!("{}/api/v1/sync/push", self.base_url);
let response: SyncPushResponse = self
.http
.post(&url)
.json(&request)
.send()
.await
.map_err(|e| {
crate::error::AllSourceError::InvalidInput(format!("sync push HTTP failed: {e}"))
})?
.json()
.await
.map_err(|e| {
crate::error::AllSourceError::InvalidInput(format!(
"sync push response parse failed: {e}"
))
})?;
Ok(SyncStats {
pushed: response.accepted,
pulled: 0,
conflicts: response.skipped,
})
}
async fn fetch_server_version_vector(
&self,
) -> Result<std::collections::BTreeMap<String, crate::infrastructure::cluster::hlc::HlcTimestamp>>
{
let request = SyncPullRequest {
node_id: self.node_id.clone(),
version_vector: std::collections::BTreeMap::new(),
};
let url = format!("{}/api/v1/sync/pull", self.base_url);
let response: SyncPullResponse = self
.http
.post(&url)
.json(&request)
.send()
.await
.map_err(|e| {
crate::error::AllSourceError::InvalidInput(format!(
"sync pull (VV fetch) HTTP failed: {e}"
))
})?
.json()
.await
.map_err(|e| {
crate::error::AllSourceError::InvalidInput(format!(
"sync pull (VV fetch) response parse failed: {e}"
))
})?;
Ok(response.version_vector)
}
}