use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
use tracing::{debug, info, warn};
use vti_common::audit::{
AuditEvent, AuditWriter, RegistryRecordPolicyOverrideData, RegistrySyncOutcomeData,
};
use vti_common::error::AppError;
use vti_common::store::KeyspaceHandle;
use super::client::{RegistryError, TrustRegistryClient};
use super::health::RegistryHealth;
use super::model::{RegistryRecord, SyncJob, SyncJobKind, SyncJobState};
use super::policy::{PublishOnJoinDecision, evaluate_publish_on_join};
use super::storage::{
delete_sync_job, get_sync_cursor, list_sync_jobs, set_sync_cursor, store_record, store_sync_job,
};
use super::tail::walk;
pub const DEFAULT_TICK_INTERVAL_SECONDS: u64 = 5;
pub struct MembershipSyncer {
audit_ks: KeyspaceHandle,
sync_queue_ks: KeyspaceHandle,
sync_cursor_ks: KeyspaceHandle,
registry_records_ks: KeyspaceHandle,
policies_ks: KeyspaceHandle,
active_policies_ks: KeyspaceHandle,
client: Arc<dyn TrustRegistryClient>,
health: RegistryHealth,
audit_writer: Option<AuditWriter>,
actor_did: String,
tick_interval: Duration,
rtbf_batch_window_hours: u64,
}
impl MembershipSyncer {
#[allow(clippy::too_many_arguments)]
pub fn new(
audit_ks: KeyspaceHandle,
sync_queue_ks: KeyspaceHandle,
sync_cursor_ks: KeyspaceHandle,
registry_records_ks: KeyspaceHandle,
policies_ks: KeyspaceHandle,
active_policies_ks: KeyspaceHandle,
client: Arc<dyn TrustRegistryClient>,
health: RegistryHealth,
audit_writer: Option<AuditWriter>,
actor_did: impl Into<String>,
) -> Self {
Self {
audit_ks,
sync_queue_ks,
sync_cursor_ks,
registry_records_ks,
policies_ks,
active_policies_ks,
client,
health,
audit_writer,
actor_did: actor_did.into(),
tick_interval: Duration::from_secs(DEFAULT_TICK_INTERVAL_SECONDS),
rtbf_batch_window_hours: 24,
}
}
pub fn with_tick_interval(mut self, interval: Duration) -> Self {
self.tick_interval = interval;
self
}
pub fn with_rtbf_batch_window_hours(mut self, hours: u64) -> Self {
self.rtbf_batch_window_hours = hours;
self
}
pub async fn run(self, mut shutdown: watch::Receiver<bool>) {
info!(
tick_interval_secs = self.tick_interval.as_secs(),
"membership-syncer task starting"
);
if let Err(e) = self.recover_in_flight().await {
warn!(error = %e, "in-flight recovery failed at syncer boot");
}
let mut timer = tokio::time::interval(self.tick_interval);
timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
timer.tick().await;
loop {
tokio::select! {
_ = timer.tick() => {
if let Err(e) = self.tick().await {
warn!(error = %e, "syncer tick failed");
}
}
_ = shutdown.changed() => {
debug!("membership-syncer task shutting down");
return;
}
}
}
}
pub async fn tick(&self) -> Result<(), AppError> {
let cursor = get_sync_cursor(&self.sync_cursor_ks).await?;
let outcome = walk(
&self.audit_ks,
&self.sync_queue_ks,
&self.policies_ks,
&self.active_policies_ks,
self.rtbf_batch_window_hours,
cursor,
)
.await?;
if let Some(new) = outcome.new_cursor
&& Some(new) != cursor
{
set_sync_cursor(&self.sync_cursor_ks, new).await?;
}
if outcome.jobs_enqueued > 0 {
debug!(jobs = outcome.jobs_enqueued, "tail walk enqueued jobs");
}
for ov in &outcome.overrides {
self.emit_override(ov);
}
let jobs = list_sync_jobs(&self.sync_queue_ks).await?;
let now = chrono::Utc::now();
for job in jobs.into_iter().filter(|j| j.is_dispatchable(now)) {
self.dispatch_one(job).await;
}
Ok(())
}
fn emit_override(&self, ov: &super::tail::OverrideEvent) {
let Some(writer) = self.audit_writer.as_ref() else {
return;
};
let payload = RegistryRecordPolicyOverrideData {
reason: ov.reason.clone(),
attempted_disposition: ov.attempted_disposition.clone(),
effective_disposition: ov.effective_disposition.clone(),
};
let actor = ov.actor_did.clone();
let target = ov.target_did.clone();
let writer = writer.clone();
tokio::spawn(async move {
if let Err(e) = writer
.write(
&actor,
Some(&target),
AuditEvent::RegistryRecordPolicyOverride(payload),
)
.await
{
warn!(error = %e, "failed to emit RegistryRecordPolicyOverride envelope");
}
});
}
pub async fn recover_in_flight(&self) -> Result<usize, AppError> {
let jobs = list_sync_jobs(&self.sync_queue_ks).await?;
let mut recovered = 0_usize;
for mut job in jobs {
if job.state == SyncJobState::InFlight {
job.state = SyncJobState::Pending;
store_sync_job(&self.sync_queue_ks, &job).await?;
recovered += 1;
debug!(
job_id = %job.id,
did = %job.member_did,
"recovered InFlight job → Pending after restart"
);
}
}
if recovered > 0 {
info!(
recovered,
"membership-syncer recovered InFlight jobs at boot"
);
}
Ok(recovered)
}
async fn dispatch_one(&self, mut job: SyncJob) {
if job.kind == SyncJobKind::PublishMember && self.policy_skips_publish().await {
if let Err(e) = delete_sync_job(&self.sync_queue_ks, job.id).await {
warn!(
error = %e,
job_id = %job.id,
"failed to delete policy-skipped PublishMember job — will retry next tick"
);
return;
}
job.record_success();
self.emit_outcome(&job, true);
debug!(
job_id = %job.id,
did = %job.member_did,
"registry.rego.publish_on_join=false — skipping PublishMember"
);
return;
}
job.state = SyncJobState::InFlight;
if let Err(e) = store_sync_job(&self.sync_queue_ks, &job).await {
warn!(error = %e, job_id = %job.id, "failed to persist InFlight transition");
return;
}
let outcome = self.run_call(&job).await;
match outcome {
Ok(()) => {
job.record_success();
self.update_mirror(&job).await;
self.health
.record_success(self.audit_writer.as_ref(), &self.actor_did)
.await;
self.emit_outcome(&job, true);
if let Err(e) = delete_sync_job(&self.sync_queue_ks, job.id).await {
warn!(error = %e, job_id = %job.id, "failed to delete completed job row");
}
}
Err(e) => {
if e.is_retriable() {
job.record_failure(format!("{e}"));
self.health
.record_failure(format!("{e}"), self.audit_writer.as_ref(), &self.actor_did)
.await;
if let Err(s) = store_sync_job(&self.sync_queue_ks, &job).await {
warn!(error = %s, job_id = %job.id, "failed to persist retry state");
}
if job.state == SyncJobState::Failed {
self.emit_outcome(&job, false);
}
} else {
job.attempts += 1;
job.last_attempted_at = Some(chrono::Utc::now());
job.last_error = Some(format!("{e}"));
job.state = SyncJobState::Failed;
if let Err(s) = store_sync_job(&self.sync_queue_ks, &job).await {
warn!(error = %s, job_id = %job.id, "failed to persist Failed state");
}
self.emit_outcome(&job, false);
warn!(
job_id = %job.id,
did = %job.member_did,
kind = job.kind.as_str(),
error = %e,
"registry rejected sync job permanently — operator intervention required"
);
}
}
}
}
async fn policy_skips_publish(&self) -> bool {
match evaluate_publish_on_join(&self.policies_ks, &self.active_policies_ks).await {
Ok(PublishOnJoinDecision::SkipPublishOnJoin) => true,
Ok(PublishOnJoinDecision::PublishOnJoin) => false,
Err(e) => {
warn!(
error = %e,
"publish_on_join evaluation failed — skipping publish until policy is fixed"
);
true
}
}
}
async fn run_call(&self, job: &SyncJob) -> Result<(), RegistryError> {
match job.kind {
SyncJobKind::PublishMember | SyncJobKind::UpdateMember => {
let record = RegistryRecord::fresh_active(&job.member_did);
self.client.publish_member(&record).await
}
SyncJobKind::MarkDeparted => {
let now = chrono::Utc::now();
let active_to = if job.disposition.as_deref() == Some("historical") {
Some(now)
} else {
None
};
let record = RegistryRecord::departed(&job.member_did, now, active_to);
self.client.publish_member(&record).await
}
SyncJobKind::DeleteMember => self.client.delete_member(&job.member_did).await,
}
}
async fn update_mirror(&self, job: &SyncJob) {
match job.kind {
SyncJobKind::PublishMember | SyncJobKind::UpdateMember => {
let record = RegistryRecord::fresh_active(&job.member_did);
if let Err(e) = store_record(&self.registry_records_ks, &record).await {
warn!(error = %e, did = %job.member_did, "failed to update registry_records mirror");
}
}
SyncJobKind::MarkDeparted => {
let now = chrono::Utc::now();
let active_to = if job.disposition.as_deref() == Some("historical") {
Some(now)
} else {
None
};
let record = RegistryRecord::departed(&job.member_did, now, active_to);
if let Err(e) = store_record(&self.registry_records_ks, &record).await {
warn!(error = %e, did = %job.member_did, "failed to update registry_records mirror");
}
}
SyncJobKind::DeleteMember => {
if let Err(e) =
super::storage::delete_record(&self.registry_records_ks, &job.member_did).await
{
warn!(error = %e, did = %job.member_did, "failed to delete registry_records mirror row");
}
}
}
}
fn emit_outcome(&self, job: &SyncJob, succeeded: bool) {
let Some(writer) = self.audit_writer.as_ref() else {
return;
};
let payload = RegistrySyncOutcomeData {
job_id: job.id.to_string(),
kind: job.kind.as_str().to_string(),
attempts: job.attempts,
last_error: if succeeded {
None
} else {
job.last_error.clone()
},
};
let event = if succeeded {
AuditEvent::RegistrySyncSucceeded(payload)
} else {
AuditEvent::RegistrySyncFailed(payload)
};
let actor_did = self.actor_did.clone();
let target_did = job.member_did.clone();
let writer = writer.clone();
tokio::spawn(async move {
if let Err(e) = writer.write(&actor_did, Some(&target_did), event).await {
warn!(error = %e, "failed to emit RegistrySync outcome");
}
});
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::registry::client::MockRegistryClient;
use crate::registry::model::{SyncJob, SyncJobKind};
use crate::registry::storage::{get_record, store_sync_job};
use vti_common::audit::{AuditEvent, AuditKeyStore, AuditWriter, MemberAddedData};
use vti_common::config::StoreConfig;
use vti_common::store::Store;
async fn fixture() -> (MembershipSyncer, MockRegistryClient, tempfile::TempDir) {
let dir = tempfile::tempdir().unwrap();
let store = Store::open(&StoreConfig {
data_dir: dir.path().to_path_buf(),
})
.unwrap();
let audit_ks = store.keyspace("audit").unwrap();
let audit_key_ks = store.keyspace("audit_key").unwrap();
let sync_queue_ks = store.keyspace("sync_queue").unwrap();
let sync_cursor_ks = store.keyspace("sync_cursor").unwrap();
let registry_records_ks = store.keyspace("registry_records").unwrap();
let policies_ks = store.keyspace("policies").unwrap();
let active_policies_ks = store.keyspace("active_policies").unwrap();
let key_store = AuditKeyStore::new(audit_key_ks);
key_store.ensure_initial(&[0xAB; 32]).await.unwrap();
let audit_writer = AuditWriter::new(audit_ks.clone(), key_store);
let mock = MockRegistryClient::new();
let client: Arc<dyn TrustRegistryClient> = Arc::new(mock.clone());
let syncer = MembershipSyncer::new(
audit_ks,
sync_queue_ks,
sync_cursor_ks,
registry_records_ks,
policies_ks,
active_policies_ks,
client,
RegistryHealth::new(),
Some(audit_writer),
"did:webvh:vtc.example",
);
(syncer, mock, dir)
}
async fn write_member_added(audit_ks: &KeyspaceHandle, target: &str) {
let audit_key_ks_dir = tempfile::tempdir().unwrap();
let key_store_for_test = Store::open(&StoreConfig {
data_dir: audit_key_ks_dir.path().to_path_buf(),
})
.unwrap();
let aks = AuditKeyStore::new(key_store_for_test.keyspace("audit_key").unwrap());
aks.ensure_initial(&[0xAB; 32]).await.unwrap();
let w = AuditWriter::new(audit_ks.clone(), aks);
w.write(
"did:webvh:vtc.example",
Some(target),
AuditEvent::MemberAdded(MemberAddedData {
role: "member".into(),
via_join_request_id: None,
}),
)
.await
.unwrap();
}
#[tokio::test]
async fn happy_path_drains_one_publish_job() {
let (syncer, mock, _dir) = fixture().await;
write_member_added(&syncer.audit_ks, "did:key:zA").await;
syncer.tick().await.unwrap();
assert_eq!(mock.call_counts().await.publish, 1);
let snap = mock.snapshot().await;
assert!(snap.contains_key("did:key:zA"));
let jobs = list_sync_jobs(&syncer.sync_queue_ks).await.unwrap();
assert!(jobs.is_empty(), "completed jobs should be deleted");
let mirror = get_record(&syncer.registry_records_ks, "did:key:zA")
.await
.unwrap();
assert!(
mirror.is_some(),
"registry_records mirror should reflect the publish"
);
assert_eq!(
syncer.health.status().await,
crate::registry::HealthStatus::Active
);
}
#[tokio::test]
async fn transient_failure_bumps_attempts_and_keeps_job_pending() {
let (syncer, mock, _dir) = fixture().await;
write_member_added(&syncer.audit_ks, "did:key:zA").await;
mock.fail_next_publish(RegistryError::Transient("flaky".into()))
.await;
syncer.tick().await.unwrap();
let jobs = list_sync_jobs(&syncer.sync_queue_ks).await.unwrap();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].state, SyncJobState::Pending);
assert_eq!(jobs[0].attempts, 1);
assert!(jobs[0].last_error.as_deref().unwrap().contains("flaky"));
}
#[tokio::test]
async fn permanent_failure_flips_to_failed_immediately() {
let (syncer, mock, _dir) = fixture().await;
write_member_added(&syncer.audit_ks, "did:key:zA").await;
mock.fail_next_publish(RegistryError::Permanent("bad input".into()))
.await;
syncer.tick().await.unwrap();
let jobs = list_sync_jobs(&syncer.sync_queue_ks).await.unwrap();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].state, SyncJobState::Failed);
assert_eq!(jobs[0].attempts, 1, "single attempt then Failed");
}
#[tokio::test]
async fn in_flight_rows_recover_to_pending_on_boot() {
let (syncer, _mock, _dir) = fixture().await;
let mut job = SyncJob::fresh(SyncJobKind::PublishMember, "did:key:zStranded");
job.state = SyncJobState::InFlight;
store_sync_job(&syncer.sync_queue_ks, &job).await.unwrap();
let recovered = syncer.recover_in_flight().await.unwrap();
assert_eq!(recovered, 1);
let jobs = list_sync_jobs(&syncer.sync_queue_ks).await.unwrap();
assert_eq!(jobs[0].state, SyncJobState::Pending);
}
#[tokio::test]
async fn publish_on_join_false_skips_dispatch() {
use crate::policy::{Policy, PolicyPurpose, set_active_policy_id, store_policy};
let (syncer, mock, _dir) = fixture().await;
let src = "\
package vtc.registry
import rego.v1
default publish_on_join := false
";
use sha2::{Digest, Sha256};
let sha: [u8; 32] = Sha256::digest(src.as_bytes()).into();
let id = uuid::Uuid::new_v4();
let policy = Policy {
id,
purpose: PolicyPurpose::Registry,
rego_source: src.into(),
sha256: sha,
activated_at: Some(chrono::Utc::now()),
author_did: "did:key:test".into(),
created_at: chrono::Utc::now(),
version: 1,
};
store_policy(&syncer.policies_ks, &policy).await.unwrap();
set_active_policy_id(&syncer.active_policies_ks, PolicyPurpose::Registry, id)
.await
.unwrap();
write_member_added(&syncer.audit_ks, "did:key:zSkip").await;
syncer.tick().await.unwrap();
assert_eq!(mock.call_counts().await.publish, 0);
let jobs = list_sync_jobs(&syncer.sync_queue_ks).await.unwrap();
assert!(jobs.is_empty());
}
#[tokio::test]
async fn delete_member_job_drops_mirror_row() {
let (syncer, _mock, _dir) = fixture().await;
store_record(
&syncer.registry_records_ks,
&RegistryRecord::fresh_active("did:key:zDrop"),
)
.await
.unwrap();
let job = SyncJob::fresh(SyncJobKind::DeleteMember, "did:key:zDrop");
store_sync_job(&syncer.sync_queue_ks, &job).await.unwrap();
syncer.tick().await.unwrap();
let mirror = get_record(&syncer.registry_records_ks, "did:key:zDrop")
.await
.unwrap();
assert!(mirror.is_none(), "mirror row should be deleted");
}
}