#![cfg(any(feature = "fjall", feature = "rocksdb"))]
mod common;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use common::TestNats;
use slipstream::snapshot::{SnapshotError, SnapshotStore};
use slipstream::{
BatchConfig, Connection, ExportRequest, KvStore, KvUpdate, NatsConnection,
NatsConnectionConfig, StoreConfig, WatchCursor, WatchScope, watch_applied,
};
use tempfile::TempDir;
use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::timeout;
async fn open_bucket(nats: &TestNats) -> (NatsConnection, Arc<dyn KvStore>) {
let conn = NatsConnection::new(NatsConnectionConfig {
url: nats.url.clone(),
creds: None,
creds_file: None,
});
conn.connect().await.expect("connect");
let store = conn
.store_with_config(StoreConfig {
name: "routes".into(),
max_bytes: Some(8 * 1024 * 1024),
..Default::default()
})
.await
.expect("open bucket");
(conn, store)
}
struct Node {
exports: mpsc::Sender<ExportRequest>,
applied: Arc<AtomicU64>,
delivered: Arc<AtomicU64>,
min_rev: Arc<AtomicU64>,
shutdown: watch::Sender<bool>,
task: tokio::task::JoinHandle<Result<WatchCursor, slipstream::KvError>>,
}
fn spawn_node<S: SnapshotStore + Send + 'static>(
bucket: &Arc<dyn KvStore>,
fold: S,
resume: Option<WatchCursor>,
) -> Node {
let watcher = bucket.watcher().expect("bucket watcher");
let (ex_tx, ex_rx) = mpsc::channel(1);
let (sd_tx, sd_rx) = watch::channel(false);
let applied = Arc::new(AtomicU64::new(0));
let delivered = Arc::new(AtomicU64::new(0));
let min_rev = Arc::new(AtomicU64::new(u64::MAX));
let applied_w = Arc::clone(&applied);
let delivered_w = Arc::clone(&delivered);
let min_rev_w = Arc::clone(&min_rev);
let task = tokio::spawn(watch_applied(
watcher,
WatchScope::All,
resume,
None, Some(fold),
Some(ex_rx),
BatchConfig::default(),
move |u: &KvUpdate| {
delivered_w.fetch_add(1, Ordering::SeqCst);
if let Some(rev) = u.version().as_u64() {
min_rev_w.fetch_min(rev, Ordering::SeqCst);
}
Some(())
},
move |_batch: Vec<()>| {},
move |cur: WatchCursor| {
applied_w.store(cur.as_u64().unwrap_or(0), Ordering::SeqCst);
},
sd_rx,
));
Node {
exports: ex_tx,
applied,
delivered,
min_rev,
shutdown: sd_tx,
task,
}
}
async fn wait_applied(node: &Node, at_least: u64) {
timeout(Duration::from_secs(10), async {
loop {
if node.applied.load(Ordering::SeqCst) >= at_least {
return;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
})
.await
.unwrap_or_else(|_| {
panic!(
"node never applied rev {at_least} (at {})",
node.applied.load(Ordering::SeqCst)
)
});
}
type ImportFn<S> = fn(&Path, &Path) -> Result<(WatchCursor, S), SnapshotError>;
async fn live_export_bootstrap_delta_only<S>(
open: impl Fn(&Path) -> (WatchCursor, S),
import: ImportFn<S>,
) where
S: SnapshotStore + Send + 'static,
{
let nats = TestNats::start().await;
let (_conn, bucket) = open_bucket(&nats).await;
let writer = bucket.writer().expect("writer");
let dir = TempDir::new().unwrap();
let (_r, fold_a) = open(&dir.path().join("node-a"));
let node_a = spawn_node(&bucket, fold_a, None);
let attach_rev = timeout(Duration::from_secs(10), async {
loop {
let v = writer.put("route.seed", b"seed").await.expect("seed");
tokio::time::sleep(Duration::from_millis(50)).await;
if node_a.applied.load(Ordering::SeqCst) > 0 {
return v.as_u64().expect("nats rev");
}
}
})
.await
.expect("node A watch never attached");
let n = 12u64;
let mut last_rev = attach_rev;
for i in 0..n {
last_rev = writer
.put(&format!("route.pre.{i}"), format!("pre-{i}").as_bytes())
.await
.expect("put")
.as_u64()
.expect("rev");
}
wait_applied(&node_a, last_rev).await;
let artifact = dir.path().join("artifact");
let (reply_tx, reply_rx) = oneshot::channel();
node_a
.exports
.send(ExportRequest {
dest_dir: artifact.clone(),
reply: reply_tx,
})
.await
.expect("send export request");
let manifest = reply_rx.await.expect("reply").expect("export succeeds");
let export_rev = manifest.cursor.as_u64().expect("cursor rev");
assert!(
export_rev >= last_rev,
"artifact covers the pre-export history"
);
let m = 7u64;
let mut final_rev = export_rev;
for i in 0..m - 1 {
final_rev = writer
.put(&format!("route.post.{i}"), format!("post-{i}").as_bytes())
.await
.expect("put")
.as_u64()
.expect("rev");
}
assert!(writer.delete("route.pre.0").await.expect("delete"));
final_rev += 1;
wait_applied(&node_a, final_rev).await;
let dest_b = dir.path().join("node-b");
let (cursor_b, fold_b) = import(&artifact, &dest_b).expect("import");
assert_eq!(
cursor_b, manifest.cursor,
"imported cursor is the manifest cursor"
);
let node_b = spawn_node(&bucket, fold_b, Some(cursor_b.clone()));
wait_applied(&node_b, final_rev).await;
assert_eq!(
node_b.delivered.load(Ordering::SeqCst),
m,
"bootstrapped node was delivered exactly the post-export tail, not a replay"
);
assert_eq!(
node_b.min_rev.load(Ordering::SeqCst),
export_rev + 1,
"the tail starts at cursor+1 — no overlap, no gap"
);
node_a.shutdown.send(true).unwrap();
node_a.task.await.unwrap().unwrap();
node_b.shutdown.send(true).unwrap();
node_b.task.await.unwrap().unwrap();
let (final_cursor, fold_b) = open(&dest_b);
assert_eq!(final_cursor.as_u64(), Some(final_rev));
let mut fold_state: Vec<(String, Vec<u8>)> = fold_b
.range("route.")
.expect("range")
.into_iter()
.map(|e| (e.key, e.value))
.collect();
fold_state.sort();
let mut bucket_state: Vec<(String, Vec<u8>)> = bucket
.reader()
.scan("route.")
.await
.expect("scan")
.into_iter()
.map(|e| (e.key, e.value))
.collect();
bucket_state.sort();
assert_eq!(
fold_state, bucket_state,
"bootstrapped fold equals the bucket"
);
assert!(
!fold_state.iter().any(|(k, _)| k == "route.pre.0"),
"the tail's delete reached the bootstrapped fold"
);
}
#[cfg(feature = "fjall")]
mod fjall_bootstrap {
use super::*;
use slipstream::{FjallConfig, FjallSnapshot};
#[tokio::test(flavor = "multi_thread")]
async fn fjall_live_export_bootstrap_delta_only() {
live_export_bootstrap_delta_only(
|path| {
FjallSnapshot::open(
path,
FjallConfig {
sync: false,
cache_size_bytes: 64 << 20,
},
)
.expect("open fjall")
},
|artifact, dest| {
FjallSnapshot::import(
artifact,
dest,
FjallConfig {
sync: false,
cache_size_bytes: 64 << 20,
},
)
},
)
.await;
}
}
#[cfg(feature = "rocksdb")]
mod rocksdb_bootstrap {
use super::*;
use slipstream::{RocksDbConfig, RocksDbSnapshot};
#[tokio::test(flavor = "multi_thread")]
async fn rocksdb_live_export_bootstrap_delta_only() {
live_export_bootstrap_delta_only(
|path| {
RocksDbSnapshot::open(
path,
RocksDbConfig {
sync: false,
cache_size_bytes: 64 << 20,
},
)
.expect("open rocksdb")
},
|artifact, dest| {
RocksDbSnapshot::import(
artifact,
dest,
RocksDbConfig {
sync: false,
cache_size_bytes: 64 << 20,
},
)
},
)
.await;
}
}