use crate::circuit::GlobalNodeId;
use crate::circuit::checkpointer::Checkpointer;
use crate::circuit::metrics::{DBSP_STEP, DBSP_STEP_LATENCY_MICROSECONDS};
use crate::circuit::schedule::CommitProgress;
use crate::monitor::visual_graph::Graph;
use crate::operator::dynamic::balance::{BalancerHint, PartitioningPolicy};
use crate::storage::backend::StorageError;
use crate::trace::spine_async::MAX_LEVEL0_BATCH_SIZE_RECORDS;
use crate::{
Error as DbspError, RootCircuit, Runtime, RuntimeError, circuit::runtime::RuntimeHandle,
profile::Profiler,
};
use anyhow::Error as AnyError;
use crossbeam::channel::{Receiver, Select, Sender, TryRecvError, bounded};
use feldera_buffer_cache::ThreadType;
use feldera_ir::LirCircuit;
use feldera_storage::{FileCommitter, StorageBackend, StoragePath};
use feldera_types::checkpoint::CheckpointMetadata;
use feldera_types::config::DevTweaks;
use feldera_types::config::dev_tweaks::{BufferCacheAllocationStrategy, BufferCacheStrategy};
pub use feldera_types::config::{StorageCacheConfig, StorageConfig, StorageOptions};
use feldera_types::transaction::CommitProgressSummary;
use itertools::Either;
use std::collections::BTreeMap;
use std::num::NonZeroUsize;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::{
collections::HashSet,
error::Error as StdError,
fmt::{self, Debug, Display, Error as FmtError, Formatter},
iter::empty,
net::SocketAddr,
ops::Range,
path::{Path, PathBuf},
thread::Result as ThreadResult,
time::Instant,
};
use tracing::{debug, info};
use uuid::Uuid;
#[cfg(doc)]
use crate::circuit::circuit_builder::Stream;
use crate::profile::{DbspProfile, GraphProfile, WorkerProfile};
use super::SchedulerError;
use super::circuit_builder::BootstrapInfo;
use super::runtime::WorkerPanicInfo;
const DEFAULT_MERGER_THREAD_RATIO: usize = 1;
#[allow(clippy::manual_non_exhaustive)]
#[derive(Clone, serde::Serialize, serde::Deserialize)]
pub struct Host {
pub address: SocketAddr,
pub workers: Range<usize>,
_private: (),
}
impl Debug for Host {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Host")
.field("address", &self.address)
.field("workers", &self.workers)
.finish()
}
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub enum Layout {
Solo {
n_workers: usize,
},
Multihost {
hosts: Vec<Host>,
local_host_idx: usize,
},
}
impl Layout {
pub fn new_solo(n_workers: usize) -> Layout {
assert_ne!(n_workers, 0);
Layout::Solo { n_workers }
}
pub fn new_multihost(
params: &[(SocketAddr, usize)],
local_address: SocketAddr,
) -> Result<Layout, LayoutError> {
let mut uniq = HashSet::new();
if let Some((duplicate, _)) = params.iter().find(|(address, _)| !uniq.insert(address)) {
return Err(LayoutError::DuplicateAddress(*duplicate));
}
let local_host_idx = params
.iter()
.position(|(address, _)| *address == local_address)
.ok_or(LayoutError::NoSuchAddress(local_address))?;
if params.len() == 1 {
Ok(Self::new_solo(params[0].1))
} else {
let mut hosts = Vec::with_capacity(params.len());
let mut total_workers = 0;
for (address, n_workers) in params {
assert_ne!(*n_workers, 0);
hosts.push(Host {
address: *address,
workers: total_workers..total_workers + *n_workers,
_private: (),
});
total_workers += *n_workers;
}
Ok(Layout::Multihost {
hosts,
local_host_idx,
})
}
}
pub fn local_workers(&self) -> Range<usize> {
match self {
Self::Solo { n_workers } => 0..*n_workers,
Self::Multihost {
hosts,
local_host_idx,
..
} => hosts[*local_host_idx].workers.clone(),
}
}
pub fn other_hosts(&self) -> impl Iterator<Item = &Host> {
match self {
Self::Solo { .. } => Either::Left(empty()),
Self::Multihost {
hosts,
local_host_idx,
} => Either::Right(
hosts
.iter()
.enumerate()
.filter_map(|(i, host)| (i != *local_host_idx).then_some(host)),
),
}
}
pub fn local_address(&self) -> Option<SocketAddr> {
match self {
Self::Solo { .. } => None,
Self::Multihost {
hosts,
local_host_idx,
..
} => Some(hosts[*local_host_idx].address),
}
}
pub fn n_workers(&self) -> usize {
match self {
Self::Solo { n_workers } => *n_workers,
Self::Multihost { hosts, .. } => hosts.iter().map(|host| host.workers.len()).sum(),
}
}
pub fn is_multihost(&self) -> bool {
matches!(self, Self::Multihost { .. })
}
pub fn is_solo(&self) -> bool {
matches!(self, Self::Solo { .. })
}
pub fn n_hosts(&self) -> usize {
match self {
Layout::Solo { .. } => 1,
Layout::Multihost { hosts, .. } => hosts.len(),
}
}
pub fn local_host_idx(&self) -> usize {
match self {
Layout::Solo { .. } => 0,
Layout::Multihost { local_host_idx, .. } => *local_host_idx,
}
}
}
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub enum LayoutError {
NoSuchAddress(SocketAddr),
DuplicateAddress(SocketAddr),
}
impl Display for LayoutError {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
match self {
Self::NoSuchAddress(address) => write!(f, "address {address} not in list of hosts"),
Self::DuplicateAddress(address) => {
write!(f, "duplicate address {address} in list of hosts")
}
}
}
}
impl StdError for LayoutError {}
#[derive(Clone, Default, PartialEq, Eq, Debug)]
pub enum Mode {
Persistent,
#[default]
Ephemeral,
}
#[derive(Clone)]
pub struct CircuitConfig {
pub layout: Layout,
pub max_rss_bytes: Option<u64>,
pub pin_cpus: Vec<usize>,
pub mode: Mode,
pub storage: Option<CircuitStorageConfig>,
pub dev_tweaks: DevTweaks,
}
pub fn splitter_output_chunk_size() -> usize {
Runtime::with_dev_tweaks(|d| d.splitter_chunk_size_records() as usize)
}
pub fn balancer_min_absolute_improvement_threshold() -> u64 {
Runtime::with_dev_tweaks(|d| d.balancer_min_absolute_improvement_threshold())
}
pub fn balancer_min_relative_improvement_threshold() -> f64 {
Runtime::with_dev_tweaks(|d| d.balancer_min_relative_improvement_threshold())
}
pub fn balancer_balance_tax() -> f64 {
Runtime::with_dev_tweaks(|d| d.balancer_balance_tax())
}
pub fn balancer_key_distribution_refresh_threshold() -> f64 {
Runtime::with_dev_tweaks(|d| d.balancer_key_distribution_refresh_threshold())
}
pub fn adaptive_joins_enabled() -> bool {
Runtime::with_dev_tweaks(|d| d.adaptive_joins())
}
pub fn max_level0_batch_size_records() -> u16 {
Runtime::with_dev_tweaks(|d| {
d.max_level0_batch_size_records
.unwrap_or(MAX_LEVEL0_BATCH_SIZE_RECORDS)
})
}
pub fn negative_weight_multiplier() -> u16 {
Runtime::with_dev_tweaks(|d| d.negative_weight_multiplier())
}
#[derive(Clone, derive_more::Debug)]
pub struct CircuitStorageConfig {
pub config: StorageConfig,
pub options: StorageOptions,
#[debug(skip)]
pub backend: Arc<dyn StorageBackend>,
pub init_checkpoint: Option<Uuid>,
}
impl CircuitStorageConfig {
pub fn for_config(
config: StorageConfig,
options: StorageOptions,
) -> Result<Self, StorageError> {
let backend = <dyn StorageBackend>::new(&config, &options)?;
Ok(Self {
config,
options,
backend,
init_checkpoint: None,
})
}
pub fn with_init_checkpoint(self, init_checkpoint: Option<Uuid>) -> Self {
Self {
init_checkpoint,
..self
}
}
}
impl Default for CircuitConfig {
fn default() -> Self {
Self::with_workers(1)
}
}
impl CircuitConfig {
pub fn with_workers(n: usize) -> Self {
Self {
layout: Layout::new_solo(n),
max_rss_bytes: None,
pin_cpus: Vec::new(),
mode: Mode::Ephemeral,
storage: None,
dev_tweaks: DevTweaks::default(),
}
}
pub fn with_max_rss_bytes(mut self, max_rss: Option<u64>) -> Self {
self.max_rss_bytes = max_rss;
self
}
pub fn with_mode(mut self, mode: Mode) -> Self {
self.mode = mode;
self
}
pub fn with_storage(mut self, storage: CircuitStorageConfig) -> Self {
self.storage = Some(storage);
self
}
pub fn with_splitter_chunk_size_records(mut self, records: u64) -> Self {
self.dev_tweaks.splitter_chunk_size_records = Some(records);
self
}
pub fn with_buffer_cache_strategy(mut self, strategy: BufferCacheStrategy) -> Self {
self.dev_tweaks.buffer_cache_strategy = Some(strategy);
self
}
pub fn with_buffer_max_buckets(mut self, max_buckets: Option<usize>) -> Self {
self.dev_tweaks.buffer_max_buckets = max_buckets;
self
}
pub fn with_buffer_cache_allocation_strategy(
mut self,
strategy: BufferCacheAllocationStrategy,
) -> Self {
self.dev_tweaks.buffer_cache_allocation_strategy = Some(strategy);
self
}
#[cfg(test)]
pub fn with_fbuf_slab_bytes_per_class(mut self, bytes_per_class: usize) -> Self {
self.dev_tweaks.fbuf_slab_bytes_per_class = Some(bytes_per_class);
self
}
pub fn with_balancer_min_relative_improvement_threshold(mut self, threshold: f64) -> Self {
self.dev_tweaks.balancer_min_relative_improvement_threshold = Some(threshold);
self
}
pub fn with_balancer_min_absolute_improvement_threshold(mut self, threshold: u64) -> Self {
self.dev_tweaks.balancer_min_absolute_improvement_threshold = Some(threshold);
self
}
pub fn with_balancer_balance_tax(mut self, tax: f64) -> Self {
self.dev_tweaks.balancer_balance_tax = Some(tax);
self
}
#[cfg(test)]
pub fn with_temporary_storage(self, path: impl AsRef<Path>) -> Self {
Self {
storage: Some(
CircuitStorageConfig::for_config(
StorageConfig {
path: path.as_ref().to_string_lossy().into_owned(),
cache: Default::default(),
},
Default::default(),
)
.unwrap(),
),
..self
}
}
pub(crate) fn num_merger_threads(&self) -> usize {
let num_workers = self.layout.local_workers().len();
match self.dev_tweaks.merger_threads {
Some(threads) => threads as usize,
None => num_workers * DEFAULT_MERGER_THREAD_RATIO,
}
}
}
impl From<&CircuitConfig> for CircuitConfig {
fn from(value: &CircuitConfig) -> Self {
value.clone()
}
}
impl From<usize> for CircuitConfig {
fn from(n_workers: usize) -> Self {
Self::with_workers(n_workers)
}
}
impl From<NonZeroUsize> for CircuitConfig {
fn from(n_workers: NonZeroUsize) -> Self {
Self::with_workers(n_workers.get())
}
}
impl From<Layout> for CircuitConfig {
fn from(layout: Layout) -> Self {
Self {
layout,
..Self::default()
}
}
}
impl Runtime {
pub fn init_circuit<F, T>(
config: impl Into<CircuitConfig>,
constructor: F,
) -> Result<(DBSPHandle, T), DbspError>
where
F: FnOnce(&mut RootCircuit) -> Result<T, AnyError> + Clone + Send + 'static,
T: Send + 'static,
{
let config: CircuitConfig = config.into();
let nworkers = config.layout.local_workers().len();
let (init_senders, init_receivers): (Vec<_>, Vec<_>) =
(0..nworkers).map(|_| bounded(0)).unzip();
let (command_senders, command_receivers): (Vec<_>, Vec<_>) =
(0..nworkers).map(|_| bounded(1)).unzip();
let (status_senders, status_receivers): (Vec<_>, Vec<_>) =
(0..nworkers).map(|_| bounded(1)).unzip();
let runtime = Self::run(&config, move |parker| {
let worker_index = Runtime::local_worker_offset();
let init_sender = init_senders.into_iter().nth(worker_index).unwrap();
let status_sender = status_senders.into_iter().nth(worker_index).unwrap();
let command_receiver = command_receivers.into_iter().nth(worker_index).unwrap();
let circuit_fn = |circuit: &mut RootCircuit| {
let profiler = Profiler::new(circuit);
constructor(circuit).map(|res| (res, profiler))
};
let (mut circuit, profiler) = match RootCircuit::build(circuit_fn) {
Ok((circuit, (res, profiler))) => {
if init_sender.send(Ok((res, circuit.fingerprint()))).is_err() {
return;
}
(circuit, profiler)
}
Err(e) => {
let _ = init_sender.send(Err(e));
return;
}
};
while !Runtime::kill_in_progress() {
match command_receiver.try_recv() {
Ok(Command::Transaction) => {
let status = circuit.transaction().map(|_| Response::Unit);
if status_sender.send(status).is_err() {
return;
}
}
Ok(Command::StartTransaction) => {
let status = circuit.start_transaction().map(|_| Response::Unit);
if status_sender.send(status).is_err() {
return;
}
}
Ok(Command::CommitTransaction) => {
let status = circuit.start_commit_transaction().map(|_| Response::Unit);
if status_sender.send(status).is_err() {
return;
}
}
Ok(Command::CommitProgress) => {
let status = Ok(Response::CommitProgress(circuit.commit_progress()));
if status_sender.send(status).is_err() {
return;
}
}
Ok(Command::Step) => {
let status = circuit
.step()
.map(|_| Response::CommitComplete(circuit.is_commit_complete()));
if status_sender.send(status).is_err() {
return;
}
}
Ok(Command::BootstrapStep) => {
if let Err(e) = circuit.transaction() {
if status_sender.send(Err(e)).is_err() {
return;
}
} else if status_sender
.send(Ok(Response::BootstrapComplete(
circuit.is_replay_complete(),
)))
.is_err()
{
return;
};
}
Ok(Command::CompleteBootstrap) => {
let status = circuit.complete_replay().map(|_| Response::Unit);
if status_sender.send(status).is_err() {
return;
}
}
Ok(Command::EnableProfiler) => {
profiler.enable_cpu_profiler();
if status_sender.send(Ok(Response::Unit)).is_err() {
return;
}
}
Ok(Command::DumpProfile { runtime_elapsed }) => {
if status_sender
.send(Ok(Response::ProfileDump(
profiler.dump_profile(runtime_elapsed),
)))
.is_err()
{
return;
}
}
Ok(Command::RetrieveGraph) => {
if status_sender
.send(Ok(Response::ProfileDump(profiler.dump_graph())))
.is_err()
{
return;
}
}
Ok(Command::RetrieveProfile { runtime_elapsed }) => {
if status_sender
.send(Ok(Response::Profile(profiler.profile(runtime_elapsed))))
.is_err()
{
return;
}
}
Ok(Command::Checkpoint(base)) => {
let mut files = Vec::new();
let response = circuit
.checkpoint(&base, &mut files)
.map(|_| Response::CheckpointCreated(files));
if status_sender.send(response).is_err() {
return;
}
}
Ok(Command::Restore(base)) => {
let result = circuit.restore(&base).map(Response::CheckpointRestored);
if status_sender.send(result).is_err() {
return;
}
}
Ok(Command::GetLir) => {
let lir = circuit.lir();
if status_sender.send(Ok(Response::Lir(lir))).is_err() {
return;
}
}
Ok(Command::SetBalancerHints(hints)) => {
let results = hints
.into_iter()
.map(|(global_node_id, hint)| {
circuit.set_balancer_hint(&global_node_id, hint)
})
.collect::<Vec<Result<(), DbspError>>>();
if status_sender
.send(Ok(Response::SetBalancerHints(results)))
.is_err()
{
return;
}
}
Ok(Command::GetCurrentBalancerPolicy) => {
let policy = circuit.get_current_balancer_policy();
if status_sender
.send(Ok(Response::CurrentBalancerPolicy(policy)))
.is_err()
{
return;
}
}
Ok(Command::Rebalance) => {
circuit.rebalance();
if status_sender.send(Ok(Response::Unit)).is_err() {
return;
}
}
Err(TryRecvError::Empty) => {
parker.park();
}
Err(_) => {
break;
}
}
}
})?;
let result = init_receivers
.into_iter()
.map(|receiver| {
receiver.recv().unwrap_or_else(|_| {
Err(DbspError::Runtime(RuntimeError::WorkerPanic {
panic_info: runtime.collect_panic_info(),
}))
})
})
.reduce(|old, new| {
if old.is_ok() && new.is_err() {
new
} else {
old
}
})
.unwrap();
let (ret, fingerprint) = match result {
Err(error) => {
let _ = runtime.kill();
return Err(error);
}
Ok(result) => result,
};
let (backend, init_checkpoint) = config
.storage
.map(|storage| (storage.backend.clone(), storage.init_checkpoint))
.unzip();
let mut dbsp = DBSPHandle::new(
backend,
runtime,
command_senders,
status_receivers,
fingerprint,
)?;
if let Some(init_checkpoint) = init_checkpoint.flatten() {
dbsp.send_restore(init_checkpoint.to_string().into())?;
}
Ok((dbsp, ret))
}
}
#[derive(Clone)]
enum Command {
StartTransaction,
Step,
CommitTransaction,
CommitProgress,
Transaction,
BootstrapStep,
CompleteBootstrap,
EnableProfiler,
DumpProfile {
runtime_elapsed: Duration,
},
RetrieveGraph,
RetrieveProfile {
runtime_elapsed: Duration,
},
GetLir,
Checkpoint(StoragePath),
Restore(StoragePath),
SetBalancerHints(Vec<(GlobalNodeId, BalancerHint)>),
GetCurrentBalancerPolicy,
Rebalance,
}
impl Debug for Command {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Command::StartTransaction => write!(f, "StartTransaction"),
Command::Step => write!(f, "Step"),
Command::CommitTransaction => write!(f, "CommitTransaction"),
Command::CommitProgress => write!(f, "CommitProgress"),
Command::Transaction => write!(f, "Transaction"),
Command::BootstrapStep => write!(f, "BootstrapStep"),
Command::CompleteBootstrap => write!(f, "CompleteBootstrap"),
Command::EnableProfiler => write!(f, "EnableProfiler"),
Command::RetrieveGraph => write!(f, "RetrieveGraph"),
Command::DumpProfile { runtime_elapsed } => f
.debug_struct("DumpProfile")
.field("runtime_elapsed", runtime_elapsed)
.finish(),
Command::RetrieveProfile { runtime_elapsed } => f
.debug_struct("RetrieveProfile")
.field("runtime_elapsed", runtime_elapsed)
.finish(),
Command::GetLir => write!(f, "GetLir"),
Command::Checkpoint(path) => f.debug_tuple("Checkpoint").field(path).finish(),
Command::Restore(path) => f.debug_tuple("Restore").field(path).finish(),
Command::SetBalancerHints(hints) => {
f.debug_tuple("SetBalancerHints").field(hints).finish()
}
Command::GetCurrentBalancerPolicy => write!(f, "GetCurrentBalancerPolicy"),
Command::Rebalance => write!(f, "Rebalance"),
}
}
}
#[derive(Debug)]
enum Response {
Unit,
CommitComplete(bool),
BootstrapComplete(bool),
CommitProgress(CommitProgress),
ProfileDump(Graph),
Profile(WorkerProfile),
CheckpointCreated(Vec<Arc<dyn FileCommitter>>),
CheckpointRestored(Option<BootstrapInfo>),
Lir(LirCircuit),
SetBalancerHints(Vec<Result<(), DbspError>>),
CurrentBalancerPolicy(BTreeMap<GlobalNodeId, PartitioningPolicy>),
}
#[derive(Debug)]
pub struct DBSPHandle {
start_time: Instant,
runtime_elapsed: Duration,
runtime: Option<RuntimeHandle>,
command_senders: Vec<Sender<Command>>,
status_receivers: Vec<Receiver<Result<Response, DbspError>>>,
checkpointer: Option<Arc<Mutex<Checkpointer>>>,
fingerprint: u64,
bootstrap_info: Option<BootstrapInfo>,
}
pub struct WorkersCommitProgress(BTreeMap<u16, CommitProgress>);
impl Default for WorkersCommitProgress {
fn default() -> Self {
Self::new()
}
}
impl WorkersCommitProgress {
pub fn new() -> Self {
WorkersCommitProgress(BTreeMap::new())
}
pub fn insert(&mut self, worker_id: u16, progress: CommitProgress) {
debug_assert!(!self.0.contains_key(&worker_id));
self.0.insert(worker_id, progress);
}
pub fn summary(&self) -> CommitProgressSummary {
let mut result = CommitProgressSummary::new();
for worker_progress in self.0.values() {
result.merge(&worker_progress.summary());
}
result
}
}
impl DBSPHandle {
fn new(
backend: Option<Arc<dyn StorageBackend>>,
runtime: RuntimeHandle,
command_senders: Vec<Sender<Command>>,
status_receivers: Vec<Receiver<Result<Response, DbspError>>>,
fingerprint: u64,
) -> Result<Self, DbspError> {
let checkpointer = backend
.map(|backend| {
let checkpointer = Checkpointer::new(backend)?;
if runtime.runtime().get_mode() == Mode::Ephemeral {
checkpointer.verify_fingerprint(fingerprint)?;
};
Ok::<_, DbspError>(checkpointer)
})
.transpose()?
.map(|checkpointer| Arc::new(Mutex::new(checkpointer)));
Ok(Self {
start_time: Instant::now(),
runtime: Some(runtime),
command_senders,
status_receivers,
checkpointer,
fingerprint,
runtime_elapsed: Duration::ZERO,
bootstrap_info: None,
})
}
pub fn runtime(&self) -> &Runtime {
self.runtime.as_ref().unwrap().runtime()
}
fn kill_inner(&mut self) -> ThreadResult<()> {
self.command_senders.clear();
self.status_receivers.clear();
self.runtime.take().unwrap().kill()
}
fn kill_async(&mut self) {
self.command_senders.clear();
self.status_receivers.clear();
self.runtime.take().unwrap().kill_async()
}
fn collect_panic_info(&self) -> Option<Vec<(usize, ThreadType, WorkerPanicInfo)>> {
self.runtime
.as_ref()
.map(|runtime| runtime.collect_panic_info())
}
fn panicked(&self) -> bool {
self.runtime
.as_ref()
.is_some_and(|runtime| runtime.panicked())
}
fn broadcast_command<F>(&mut self, command: Command, mut handler: F) -> Result<(), DbspError>
where
F: FnMut(usize, Response),
{
if self.runtime.is_none() {
return Err(DbspError::Runtime(RuntimeError::Terminated));
}
for (worker, sender) in self.command_senders.iter().enumerate() {
if sender.send(command.clone()).is_err() {
let panic_info = self.collect_panic_info().unwrap_or_default();
self.kill_async();
return Err(DbspError::Runtime(RuntimeError::WorkerPanic { panic_info }));
}
self.runtime.as_ref().unwrap().unpark_worker(worker);
}
let mut select = Select::new();
for receiver in self.status_receivers.iter() {
select.recv(receiver);
}
fn handle_panic(this: &mut DBSPHandle) -> Result<(), DbspError> {
let panic_info = this.collect_panic_info().unwrap_or_default();
this.kill_async();
Err(DbspError::Runtime(RuntimeError::WorkerPanic { panic_info }))
}
for _ in 0..self.status_receivers.len() {
let ready = select.select();
let worker = ready.index();
match ready.recv(&self.status_receivers[worker]) {
Err(_) => return handle_panic(self),
Ok(Err(e)) => {
let _ = self.kill_inner();
return Err(e);
}
Ok(Ok(resp)) => handler(worker, resp),
}
}
if self.panicked() {
return handle_panic(self);
}
Ok(())
}
fn unicast_command(&mut self, worker: usize, command: Command) -> Result<Response, DbspError> {
if self.runtime.is_none() {
return Err(DbspError::Runtime(RuntimeError::Terminated));
}
if self.command_senders[worker].send(command.clone()).is_err() {
let panic_info = self.collect_panic_info().unwrap_or_default();
self.kill_async();
return Err(DbspError::Runtime(RuntimeError::WorkerPanic { panic_info }));
}
self.runtime.as_ref().unwrap().unpark_worker(worker);
let reply = match self.status_receivers[worker].recv() {
Err(_) => return handle_panic(self),
Ok(Err(e)) => {
let _ = self.kill_inner();
return Err(e);
}
Ok(Ok(resp)) => resp,
};
fn handle_panic(this: &mut DBSPHandle) -> Result<Response, DbspError> {
let panic_info = this.collect_panic_info().unwrap_or_default();
this.kill_async();
Err(DbspError::Runtime(RuntimeError::WorkerPanic { panic_info }))
}
if self.panicked() {
return handle_panic(self);
}
Ok(reply)
}
pub fn transaction(&mut self) -> Result<(), DbspError> {
if self.bootstrap_in_progress() {
self.step_bootstrap()
} else {
self.transaction_regular()
}
}
pub fn start_transaction(&mut self) -> Result<(), DbspError> {
let start = Instant::now();
let result = self.broadcast_command(Command::StartTransaction, |_, _| {});
if let Some(handle) = self.runtime.as_ref() {
self.runtime_elapsed +=
start.elapsed() * handle.runtime().layout().local_workers().len() as u32 * 2;
}
result
}
pub fn step(&mut self) -> Result<bool, DbspError> {
let start = Instant::now();
let mut commit_complete = Vec::with_capacity(self.status_receivers.len());
let result = self.broadcast_command(Command::Step, |_worker, response| {
let Response::CommitComplete(complete) = response else {
panic!("Expected CommitComplete response, got {response:?}");
};
commit_complete.push(complete);
});
if let Some(handle) = self.runtime.as_ref() {
self.runtime_elapsed +=
start.elapsed() * handle.runtime().layout().local_workers().len() as u32 * 2;
}
result?;
let commit_complete = commit_complete.iter().any(|complete| *complete);
if commit_complete {
debug!("Commit complete");
}
Ok(commit_complete)
}
pub fn start_commit_transaction(&mut self) -> Result<(), DbspError> {
let start = Instant::now();
let result = self.broadcast_command(Command::CommitTransaction, |_, _| {});
if let Some(handle) = self.runtime.as_ref() {
self.runtime_elapsed +=
start.elapsed() * handle.runtime().layout().local_workers().len() as u32 * 2;
}
result
}
pub fn commit_transaction(&mut self) -> Result<(), DbspError> {
self.start_commit_transaction()?;
loop {
let commit_complete = self.step()?;
if commit_complete {
return Ok(());
}
}
}
pub fn commit_progress(&mut self) -> Result<WorkersCommitProgress, DbspError> {
let mut progress = WorkersCommitProgress::new();
self.broadcast_command(Command::CommitProgress, |worker, response| {
let Response::CommitProgress(worker_progress) = response else {
panic!("Expected CommitProgress response, got {response:?}");
};
progress.insert(worker as u16, worker_progress);
})?;
Ok(progress)
}
pub fn set_replay_step_size(&mut self, step_size: usize) {
if let Some(handle) = self.runtime.as_ref() {
handle.runtime().set_replay_step_size(step_size);
}
}
pub fn get_replay_step_size(&self) -> usize {
if let Some(handle) = self.runtime.as_ref() {
handle.runtime().get_replay_step_size()
} else {
0
}
}
fn transaction_regular(&mut self) -> Result<(), DbspError> {
DBSP_STEP.fetch_add(1, Ordering::Relaxed);
let start = Instant::now();
let result = self.broadcast_command(Command::Transaction, |_, _| {});
DBSP_STEP_LATENCY_MICROSECONDS
.lock()
.unwrap()
.record_elapsed(start);
if let Some(handle) = self.runtime.as_ref() {
self.runtime_elapsed +=
start.elapsed() * handle.runtime().layout().local_workers().len() as u32 * 2;
}
result
}
fn step_bootstrap(&mut self) -> Result<(), DbspError> {
DBSP_STEP.fetch_add(1, Ordering::Relaxed);
let start = Instant::now();
let mut replay_complete = Vec::with_capacity(self.status_receivers.len());
let result = self.broadcast_command(Command::BootstrapStep, |_worker, response| {
let Response::BootstrapComplete(complete) = response else {
panic!("Expected BootstrapComplete response, got {response:?}");
};
replay_complete.push(complete);
});
DBSP_STEP_LATENCY_MICROSECONDS
.lock()
.unwrap()
.record_elapsed(start);
if let Some(handle) = self.runtime.as_ref() {
self.runtime_elapsed +=
start.elapsed() * handle.runtime().layout().local_workers().len() as u32 * 2;
}
result?;
if replay_complete.iter().all(|complete| *complete) {
info!("Bootstrap complete");
self.send_complete_bootstrap()?;
}
Ok(())
}
pub fn bootstrap_in_progress(&self) -> bool {
self.bootstrap_info.is_some()
}
pub fn bootstrap_info(&self) -> &Option<BootstrapInfo> {
&self.bootstrap_info
}
pub fn runtime_elapsed(&self) -> Duration {
self.runtime_elapsed
}
pub fn fingerprint(&self) -> u64 {
self.fingerprint
}
fn send_restore(&mut self, base: StoragePath) -> Result<(), DbspError> {
let mut worker_replay_info = BTreeMap::<usize, Option<BootstrapInfo>>::new();
self.broadcast_command(Command::Restore(base), |worker, resp| {
let Response::CheckpointRestored(replay_info) = resp else {
panic!("Expected checkpoint restore response, got {resp:?}");
};
worker_replay_info.insert(worker, replay_info);
})?;
for i in 1..worker_replay_info.len() {
if worker_replay_info[&i] != worker_replay_info[&0] {
let mut info = Vec::new();
for j in 0..worker_replay_info.len() {
info.push(format!(
" worker {j} replay info: {:?}",
worker_replay_info[&j]
));
}
let info = info.join("\n");
return Err(DbspError::Scheduler(SchedulerError::ReplayInfoConflict {
error: format!(
"worker 0 and worker {i} returned different replay info after restarting from a checkpoint; this can be caused by a bug or data corruption; replay info\n{info}"
),
}));
}
}
self.bootstrap_info = worker_replay_info[&0].clone();
if let Some(bootstrap_info) = &self.bootstrap_info {
info!(
"Circuit restored from checkpoint, bootstrapping new parts of the circuit: {bootstrap_info:?}"
);
}
Ok(())
}
fn send_complete_bootstrap(&mut self) -> Result<(), DbspError> {
self.broadcast_command(Command::CompleteBootstrap, |_, _| {})?;
self.bootstrap_info = None;
Ok(())
}
fn checkpointer(&self) -> Result<&Arc<Mutex<Checkpointer>>, DbspError> {
self.checkpointer
.as_ref()
.ok_or(DbspError::Storage(StorageError::StorageDisabled))
}
pub fn checkpoint(&mut self) -> CheckpointBuilder<'_> {
CheckpointBuilder::new(self)
}
pub fn list_checkpoints(&mut self) -> Result<Vec<CheckpointMetadata>, DbspError> {
self.checkpointer()?.lock().unwrap().list_checkpoints()
}
pub fn gc_checkpoint(
&mut self,
except: HashSet<uuid::Uuid>,
) -> Result<HashSet<uuid::Uuid>, DbspError> {
self.checkpointer()?.lock().unwrap().gc_checkpoint(except)
}
pub fn enable_cpu_profiler(&mut self) -> Result<(), DbspError> {
self.broadcast_command(Command::EnableProfiler, |_, _| {})
}
pub fn dump_profile<P: AsRef<Path>>(&mut self, dir_path: P) -> Result<PathBuf, DbspError> {
Ok(self.graph_profile()?.dump(dir_path)?)
}
pub fn graph_profile(&mut self) -> Result<GraphProfile, DbspError> {
let mut worker_graphs = vec![Default::default(); self.status_receivers.len()];
self.broadcast_command(
Command::DumpProfile {
runtime_elapsed: self.runtime_elapsed(),
},
|worker, resp| {
if let Response::ProfileDump(prof) = resp {
worker_graphs[worker] = prof;
}
},
)?;
Ok(GraphProfile {
elapsed_time: self.start_time.elapsed(),
worker_offset: self.runtime().layout().local_workers().start,
worker_graphs,
})
}
pub fn retrieve_profile(&mut self) -> Result<DbspProfile, DbspError> {
let mut profiles = vec![Default::default(); self.status_receivers.len()];
let mut graphs = vec![Default::default(); self.status_receivers.len()];
self.broadcast_command(
Command::RetrieveProfile {
runtime_elapsed: self.runtime_elapsed(),
},
|worker, resp| {
if let Response::Profile(prof) = resp {
profiles[worker] = prof;
}
},
)?;
self.broadcast_command(Command::RetrieveGraph, |worker, resp| {
if let Response::ProfileDump(graph) = resp {
graphs[worker] = graph;
}
})?;
Ok(DbspProfile::new(profiles, graphs.pop()))
}
pub fn lir(&mut self) -> Result<LirCircuit, DbspError> {
let mut lirs = vec![Default::default(); self.status_receivers.len()];
self.broadcast_command(Command::GetLir, |worker, resp| {
if let Response::Lir(lir) = resp {
lirs[worker] = lir;
}
})?;
Ok(lirs.remove(0))
}
pub fn kill(mut self) -> ThreadResult<()> {
if self.runtime.is_none() {
return Ok(());
}
self.kill_inner()
}
pub fn set_balancer_hint(
&mut self,
global_node_id: &GlobalNodeId,
hint: BalancerHint,
) -> Result<(), DbspError> {
let mut result = self.set_balancer_hints(vec![(global_node_id.clone(), hint)])?;
result.pop().unwrap()
}
pub fn set_balancer_hints(
&mut self,
hints: Vec<(GlobalNodeId, BalancerHint)>,
) -> Result<Vec<Result<(), DbspError>>, DbspError> {
let mut results = Vec::new();
self.broadcast_command(Command::SetBalancerHints(hints), |_, resp| {
let Response::SetBalancerHints(worker_results) = resp else {
panic!("Expected SetBalancerHints response, got {resp:?}");
};
results = worker_results;
})?;
Ok(results)
}
pub fn get_current_balancer_policy(
&mut self,
) -> Result<BTreeMap<GlobalNodeId, PartitioningPolicy>, DbspError> {
let resp = self.unicast_command(0, Command::GetCurrentBalancerPolicy)?;
let Response::CurrentBalancerPolicy(policy) = resp else {
panic!("Expected GetCurrentBalancerPolicy policy response, got {resp:?}");
};
Ok(policy)
}
pub fn rebalance(&mut self) -> Result<(), DbspError> {
self.broadcast_command(Command::Rebalance, |_, _| {})?;
Ok(())
}
}
impl Drop for DBSPHandle {
fn drop(&mut self) {
if self.runtime.is_some() {
let _ = self.kill_inner();
}
}
}
#[derive(Debug)]
pub struct CheckpointBuilder<'a> {
handle: &'a mut DBSPHandle,
name: Option<String>,
steps: Option<u64>,
processed_records: Option<u64>,
}
impl<'a> CheckpointBuilder<'a> {
fn new(handle: &'a mut DBSPHandle) -> Self {
Self {
handle,
name: None,
steps: None,
processed_records: None,
}
}
pub fn with_name(self, name: impl Into<String>) -> Self {
Self {
name: Some(name.into()),
..self
}
}
pub fn with_steps(self, steps: u64) -> Self {
Self {
steps: Some(steps),
..self
}
}
pub fn with_processed_records(self, processed_records: u64) -> Self {
Self {
processed_records: Some(processed_records),
..self
}
}
pub fn run(self) -> Result<CheckpointMetadata, DbspError> {
self.prepare().and_then(CheckpointCommitter::commit)
}
pub fn prepare(self) -> Result<CheckpointCommitter, DbspError> {
let checkpointer = self.handle.checkpointer()?.clone();
let uuid = Uuid::now_v7();
let checkpoint_dir = Checkpointer::checkpoint_dir(uuid);
let mut readers = Vec::new();
self.handle
.broadcast_command(Command::Checkpoint(checkpoint_dir), |_worker, resp| {
let Response::CheckpointCreated(r) = resp else {
panic!("Expected checkpoint response, got {resp:?}");
};
readers.push(r);
})?;
Ok(CheckpointCommitter {
checkpointer,
uuid,
readers,
fingerprint: self.handle.fingerprint,
name: self.name,
steps: self.steps,
processed_records: self.processed_records,
})
}
}
pub struct CheckpointCommitter {
checkpointer: Arc<Mutex<Checkpointer>>,
uuid: Uuid,
readers: Vec<Vec<Arc<dyn FileCommitter>>>,
fingerprint: u64,
name: Option<String>,
steps: Option<u64>,
processed_records: Option<u64>,
}
impl CheckpointCommitter {
pub fn commit(self) -> Result<CheckpointMetadata, DbspError> {
for reader in self.readers.into_iter().flatten() {
reader.commit()?;
}
self.checkpointer.lock().unwrap().commit(
self.uuid,
self.fingerprint,
self.name,
self.steps,
self.processed_records,
)
}
}
#[cfg(test)]
pub(crate) mod tests {
use std::fs::{File, create_dir_all};
use std::io;
use std::path::Path;
use std::time::Duration;
use std::{fs, vec};
use super::{CircuitStorageConfig, Mode};
use crate::circuit::checkpointer::Checkpointer;
use crate::circuit::dbsp_handle::DevTweaks;
use crate::circuit::runtime::TOKIO_WORKER_INDEX;
use crate::circuit::{CircuitConfig, Layout};
use crate::dynamic::{ClonableTrait, DowncastTrait, DynData, Erase};
use crate::operator::Generator;
use crate::operator::TraceBound;
use crate::storage::backend::StorageError;
use crate::trace::BatchReaderFactories;
use crate::utils::Tup2;
use crate::{
Circuit, DBSPHandle, Error as DbspError, IndexedZSetHandle, InputHandle, OrdZSet,
OutputHandle, Runtime, RuntimeError, ZSetHandle, ZWeight,
};
use anyhow::anyhow;
use feldera_buffer_cache::ThreadType;
use feldera_types::config::{StorageCacheConfig, StorageConfig, StorageOptions};
use tempfile::{TempDir, tempdir};
use uuid::Uuid;
#[test]
fn test_panic_in_worker1() {
test_panic_in_worker(1);
}
#[test]
fn test_panic_in_worker4() {
test_panic_in_worker(4);
}
fn test_panic_in_worker(nworkers: usize) {
let res = Runtime::init_circuit(nworkers, |circuit| {
if Runtime::worker_index() == 0 {
panic!();
}
circuit.add_source(Generator::new(|| 5usize));
Ok(())
});
if let DbspError::Runtime(err) = res.unwrap_err() {
assert!(matches!(err, RuntimeError::WorkerPanic { .. }));
} else {
panic!();
}
}
#[test]
fn test_step_panic1() {
test_step_panic(1);
}
#[test]
fn test_step_panic4() {
test_step_panic(4);
}
fn test_step_panic(nworkers: usize) {
let (mut handle, _) = Runtime::init_circuit(nworkers, |circuit| {
circuit.add_source(Generator::new(|| {
if Runtime::worker_index() == 0 {
panic!()
} else {
5usize
}
}));
Ok(())
})
.unwrap();
if let DbspError::Runtime(err) = handle.transaction().unwrap_err() {
matches!(err, RuntimeError::WorkerPanic { .. });
} else {
panic!();
}
}
#[test]
fn test_panic_no_deadlock() {
let (mut handle, _) = Runtime::init_circuit(4, |circuit| {
circuit.add_source(Generator::new(|| {
if Runtime::worker_index() == 1 {
panic!()
} else {
std::thread::sleep(Duration::MAX)
}
}));
Ok(())
})
.unwrap();
if let DbspError::Runtime(err) = handle.transaction().unwrap_err() {
matches!(err, RuntimeError::WorkerPanic { .. });
} else {
panic!();
}
}
#[test]
fn test_panic_in_tokio_merger_runtime() {
let (panic_tx, panic_rx) = std::sync::mpsc::channel();
let (mut handle, _) = Runtime::init_circuit(1, move |circuit| {
let (_stream, _input_handle) = circuit.add_input_map::<u64, u64, i64, _>(|v, u| {
*v = ((*v as i64) + *u) as u64;
});
if Runtime::worker_index() == 0 {
let runtime = Runtime::runtime().unwrap();
let panic_tx = panic_tx.clone();
runtime.tokio_merger_runtime().spawn(async move {
TOKIO_WORKER_INDEX
.scope(0, async move {
let _ = std::panic::catch_unwind(|| {
panic!("panic from tokio merger runtime task");
});
let _ = panic_tx.send(());
})
.await;
});
}
Ok(())
})
.unwrap();
panic_rx
.recv_timeout(Duration::from_secs(5))
.expect("timed out waiting for panic task to complete");
if let DbspError::Runtime(err) = handle.transaction().unwrap_err() {
println!("error: {err}");
match err {
RuntimeError::WorkerPanic { panic_info } => {
assert!(
panic_info
.iter()
.any(|(_worker, thread_type, _info)| *thread_type
== ThreadType::Background),
"expected WorkerPanic to include background worker panic info"
);
}
_ => panic!(),
}
} else {
panic!();
}
}
#[test]
fn test_kill1() {
test_kill(1);
}
#[test]
fn test_kill4() {
test_kill(4);
}
fn test_kill(nworkers: usize) {
let (mut handle, _) = Runtime::init_circuit(nworkers, |circuit| {
circuit.add_source(Generator::new(|| 5usize));
Ok(())
})
.unwrap();
handle.enable_cpu_profiler().unwrap();
handle.transaction().unwrap();
handle
.dump_profile(std::env::temp_dir().join("test_kill"))
.unwrap();
handle.kill().unwrap();
}
#[test]
fn test_drop1() {
test_drop(1);
}
#[test]
fn test_drop4() {
test_drop(4);
}
fn test_drop(nworkers: usize) {
let (mut handle, _) = Runtime::init_circuit(nworkers, |circuit| {
circuit.add_source(Generator::new(|| 5usize));
Ok(())
})
.unwrap();
handle.transaction().unwrap();
}
#[test]
fn test_failing_constructor() {
match Runtime::init_circuit(4, |_circuit| Err::<(), _>(anyhow!("constructor failed"))) {
Err(DbspError::Constructor(msg)) => assert_eq!(msg.to_string(), "constructor failed"),
_ => panic!(),
}
}
type CircuitHandle = (
IndexedZSetHandle<i32, i32>,
OutputHandle<OrdZSet<Tup2<i32, i32>>>,
InputHandle<usize>,
);
fn mkcircuit(cconf: &CircuitConfig) -> Result<(DBSPHandle, CircuitHandle), DbspError> {
Runtime::init_circuit(cconf, move |circuit| {
let (stream, handle) = circuit.add_input_indexed_zset::<i32, i32>();
let (sample_size_stream, sample_size_handle) = circuit.add_input_stream::<usize>();
let map_factories = BatchReaderFactories::new::<Tup2<i32, i32>, (), ZWeight>();
let sample_handle = stream
.integrate_trace()
.stream_sample_unique_key_vals(&sample_size_stream)
.inner()
.dyn_map(
&map_factories,
Box::new(|kinput, kv| {
let kinput: &DynData =
unsafe { kinput.downcast::<Tup2<i32, i32>>() }.erase();
kinput.clone_to(kv.split_mut().0);
}),
)
.typed()
.output();
Ok((handle, sample_handle, sample_size_handle))
})
}
fn mkcircuit_different(
cconf: &CircuitConfig,
) -> Result<(DBSPHandle, CircuitHandle), DbspError> {
Runtime::init_circuit(cconf, move |circuit| {
let map_factories = BatchReaderFactories::new::<Tup2<i32, i32>, (), ZWeight>();
let (stream, handle) = circuit.add_input_indexed_zset::<i32, i32>();
let (sample_size_stream, sample_size_handle) = circuit.add_input_stream::<usize>();
let sample_handle = stream
.integrate_trace()
.stream_sample_unique_key_vals(&sample_size_stream)
.inner()
.dyn_map(
&map_factories,
Box::new(|kinput, kv| {
let kinput: &DynData =
unsafe { kinput.downcast::<Tup2<i32, i32>>() }.erase();
kinput.clone_to(kv.split_mut().0);
}),
)
.typed()
.output();
let _sample_handle2: OutputHandle<OrdZSet<Tup2<i32, i32>>> = stream
.integrate_trace()
.stream_sample_unique_key_vals(&sample_size_stream)
.inner()
.dyn_map(
&map_factories,
Box::new(|kinput, kv| {
let kinput: &DynData =
unsafe { kinput.downcast::<Tup2<i32, i32>>() }.erase();
kinput.clone_to(kv.split_mut().0);
}),
)
.typed()
.output();
Ok((handle, sample_handle, sample_size_handle))
})
}
#[allow(clippy::type_complexity)]
fn mkcircuit_with_bounds(
cconf: &CircuitConfig,
) -> Result<(DBSPHandle, CircuitHandle), DbspError> {
Runtime::init_circuit(cconf, move |circuit| {
let (stream, handle) = circuit.add_input_indexed_zset::<i32, i32>();
let (sample_size_stream, sample_size_handle) = circuit.add_input_stream::<usize>();
let map_factories = BatchReaderFactories::new::<Tup2<i32, i32>, (), ZWeight>();
let tb = TraceBound::new();
tb.set(Box::new(10).erase_box());
let sample_handle = stream
.integrate_trace_with_bound(tb.clone(), tb)
.stream_sample_unique_key_vals(&sample_size_stream)
.inner()
.dyn_map(
&map_factories,
Box::new(|kinput, kv| {
let kinput: &DynData =
unsafe { kinput.downcast::<Tup2<i32, i32>>() }.erase();
kinput.clone_to(kv.split_mut().0);
}),
)
.typed()
.output();
Ok((handle, sample_handle, sample_size_handle))
})
}
pub(crate) fn mkconfig() -> (TempDir, CircuitConfig) {
let temp = tempdir().expect("Can't create temp dir for storage");
let cconf = CircuitConfig {
layout: Layout::new_solo(1),
max_rss_bytes: None,
mode: Mode::Ephemeral,
pin_cpus: Vec::new(),
storage: Some(
CircuitStorageConfig::for_config(
StorageConfig {
path: temp.path().to_string_lossy().into_owned(),
cache: StorageCacheConfig::default(),
},
StorageOptions {
min_storage_bytes: Some(0),
..StorageOptions::default()
},
)
.unwrap(),
),
dev_tweaks: DevTweaks::default(),
};
(temp, cconf)
}
fn generic_checkpoint_restore(
input: Vec<Vec<Tup2<i32, Tup2<i32, i64>>>>,
circuit_fun: fn(&CircuitConfig) -> Result<(DBSPHandle, CircuitHandle), DbspError>,
) {
const SAMPLE_SIZE: usize = 25; assert!(input.len() < SAMPLE_SIZE, "input should be <SAMPLE_SIZE");
let (_temp, mut cconf) = mkconfig();
let mut committed = vec![];
let mut checkpoints = vec![];
{
let (mut dbsp, (input_handle, output_handle, sample_size_handle)) =
circuit_fun(&cconf).unwrap();
for mut batch in input.clone() {
let cpm = dbsp.checkpoint().run().expect("commit shouldn't fail");
checkpoints.push(cpm);
sample_size_handle.set_for_all(SAMPLE_SIZE);
input_handle.append(&mut batch);
dbsp.transaction().unwrap();
let res = output_handle.take_from_all();
committed.push(res[0].clone());
}
}
assert_eq!(committed.len(), input.len());
let mut batches_to_insert = input.clone();
for (i, cpm) in checkpoints.iter().enumerate() {
cconf.storage.as_mut().unwrap().init_checkpoint = Some(cpm.uuid);
let (mut dbsp, (input_handle, output_handle, sample_size_handle)) =
mkcircuit(&cconf).unwrap();
sample_size_handle.set_for_all(SAMPLE_SIZE);
input_handle.append(&mut batches_to_insert[i]);
dbsp.transaction().unwrap();
let res = output_handle.take_from_all();
let expected_zset = committed[i].clone();
assert_eq!(expected_zset, res[0]);
}
}
#[test]
fn can_find_batches_for_checkpoint() {
let (_temp, cconf) = mkconfig();
let (mut dbsp, (input_handle, _, _)) = mkcircuit(&cconf).unwrap();
let mut batch = vec![Tup2(1, Tup2(2, 1))];
input_handle.append(&mut batch);
dbsp.transaction().unwrap();
let cpm = dbsp.checkpoint().run().expect("commit failed");
let batchfiles = dbsp
.checkpointer
.as_ref()
.unwrap()
.lock()
.unwrap()
.gather_batches_for_checkpoint(&cpm)
.expect("failed to gather batches");
assert_eq!(batchfiles.len(), 1);
}
#[test]
fn checkpoint_file() {
let (_temp, cconf) = mkconfig();
{
let (mut dbsp, (_input_handle, _output_handle, sample_size_handle)) =
mkcircuit(&cconf).unwrap();
sample_size_handle.set_for_all(2);
dbsp.transaction().unwrap();
dbsp.checkpoint()
.with_name("test-commit")
.run()
.expect("commit failed");
dbsp.transaction().unwrap();
dbsp.checkpoint().run().expect("commit failed");
}
{
let (dbsp, _) = mkcircuit(&cconf).unwrap();
let cpm = &dbsp
.checkpointer
.as_ref()
.unwrap()
.lock()
.unwrap()
.list_checkpoints()
.unwrap()[0];
assert_ne!(cpm.uuid, Uuid::nil());
assert_eq!(cpm.identifier, Some(String::from("test-commit")));
let cpm2 = &dbsp
.checkpointer
.as_ref()
.unwrap()
.lock()
.unwrap()
.list_checkpoints()
.unwrap()[1];
assert_ne!(cpm2.uuid, Uuid::nil());
assert_ne!(cpm2.uuid, cpm.uuid);
assert_eq!(cpm2.identifier, None);
}
}
#[test]
fn circuit_takes_ownership_of_storage_dir() {
let (_temp, cconf) = mkconfig();
let (_dbsp, _) = mkcircuit(&cconf).unwrap();
let r = Runtime::init_circuit(cconf, |_circuit| Ok(()));
assert!(matches!(
r,
Err(DbspError::Storage(StorageError::StorageLocked(_, _)))
));
}
#[test]
fn revert_to_unknown_checkpoint() {
let (_temp, mut cconf) = mkconfig();
let (dbsp, _) = mkcircuit(&cconf).unwrap();
drop(dbsp);
cconf.storage.as_mut().unwrap().init_checkpoint = Some(Uuid::now_v7());
let res = mkcircuit(&cconf);
let Err(err) = res else {
panic!("revert_to_unknown_checkpoint is supposed to fail");
};
assert!(matches!(
err,
DbspError::Storage(StorageError::CheckpointNotFound(_))
));
}
#[test]
#[should_panic]
fn revert_to_partial_checkpoint() {
let (temp, mut cconf) = mkconfig();
let (dbsp, _) = mkcircuit(&cconf).unwrap();
drop(dbsp);
let init_checkpoint = Uuid::now_v7(); cconf.storage.as_mut().unwrap().init_checkpoint = Some(init_checkpoint);
let checkpoint_dir = temp.path().join(init_checkpoint.to_string());
create_dir_all(checkpoint_dir).expect("can't create checkpoint dir");
mkcircuit(&cconf).unwrap();
}
fn init_test_tracing() {
let _ = tracing_subscriber::fmt::try_init();
}
#[test]
fn gc_commits() {
init_test_tracing();
let (temp, cconf) = mkconfig();
fn count_directory_entries<P: AsRef<Path>>(path: P) -> io::Result<usize> {
let mut file_count = 0;
let entries = fs::read_dir(path)?;
for entry in entries {
let _entry = entry?;
file_count += 1;
}
Ok(file_count)
}
let (mut dbsp, (input_handle, _, _)) = mkcircuit(&cconf).unwrap();
let _cpm = dbsp.checkpoint().run().expect("commit failed");
let mut batches: Vec<Vec<Tup2<i32, Tup2<i32, i64>>>> = vec![
vec![Tup2(1, Tup2(2, 1))],
vec![Tup2(2, Tup2(3, 1))],
vec![Tup2(3, Tup2(4, 1))],
vec![Tup2(3, Tup2(4, 1))],
vec![Tup2(1, Tup2(2, 1))],
vec![Tup2(2, Tup2(3, 1))],
vec![Tup2(3, Tup2(4, 1))],
vec![Tup2(3, Tup2(4, 1))],
];
for chunk in batches.chunks_mut(2) {
input_handle.append(&mut chunk[0]);
input_handle.append(&mut chunk[1]);
dbsp.transaction().unwrap();
let _cpm = dbsp.checkpoint().run().expect("commit failed");
}
let prev_count = count_directory_entries(temp.path()).unwrap();
let num_checkpoints = dbsp.list_checkpoints().unwrap().len();
assert!(num_checkpoints > Checkpointer::MIN_CHECKPOINT_THRESHOLD);
let _r = dbsp.gc_checkpoint(std::collections::HashSet::new());
let count = count_directory_entries(temp.path()).unwrap();
assert!(count < prev_count);
assert!(dbsp.list_checkpoints().unwrap().len() <= Checkpointer::MIN_CHECKPOINT_THRESHOLD);
}
#[test]
fn gc_on_startup() {
init_test_tracing();
let (temp, cconf) = mkconfig();
let (mut dbsp, (input_handle, _, _)) = mkcircuit(&cconf).unwrap();
let mut batch: Vec<Tup2<i32, Tup2<i32, i64>>> = vec![Tup2(1, Tup2(2, 1))];
input_handle.append(&mut batch);
dbsp.checkpoint().run().expect("commit shouldn't fail");
drop(dbsp);
let incomplete_batch_path = temp.path().join("incomplete_batch.mut");
let _ = File::create(&incomplete_batch_path).expect("can't create file");
let incomplete_checkpoint_dir = temp.path().join(Uuid::now_v7().to_string());
fs::create_dir(&incomplete_checkpoint_dir).expect("can't create checkpoint dir");
let _ = File::create(incomplete_checkpoint_dir.join("filename.feldera"))
.expect("can't create file");
let complete_batch_unused = temp.path().join("complete_batch.feldera");
let _ = File::create(&complete_batch_unused).expect("can't create file");
let (_dbsp, _) = mkcircuit(&cconf).unwrap();
assert!(!incomplete_checkpoint_dir.exists());
assert!(!incomplete_batch_path.exists());
assert!(!complete_batch_unused.exists());
}
#[test]
fn commit_restore() {
init_test_tracing();
let batches: Vec<Vec<Tup2<i32, Tup2<i32, i64>>>> = vec![
vec![Tup2(1, Tup2(2, 1))],
vec![Tup2(3, Tup2(4, 1))],
vec![Tup2(5, Tup2(6, 1))],
vec![Tup2(7, Tup2(8, 1))],
vec![Tup2(9, Tup2(10, 1))],
];
generic_checkpoint_restore(batches, mkcircuit);
}
#[test]
#[ignore]
fn commit_restore_bounds() {
init_test_tracing();
let batches: Vec<Vec<Tup2<i32, Tup2<i32, i64>>>> = vec![
vec![Tup2(1, Tup2(2, 1))],
vec![Tup2(7, Tup2(8, 1))],
vec![Tup2(9, Tup2(10, 1))],
vec![Tup2(12, Tup2(12, 1))],
vec![Tup2(13, Tup2(13, 1))],
];
generic_checkpoint_restore(batches, mkcircuit_with_bounds);
}
#[test]
fn fingerprint_is_different() {
let (_tempdir, cconf) = mkconfig();
let fid1 = mkcircuit(&cconf).unwrap().0.fingerprint();
let fid2 = mkcircuit_different(&cconf).unwrap().0.fingerprint();
assert_ne!(fid1, fid2);
let fid3 = mkcircuit_with_bounds(&cconf).unwrap().0.fingerprint();
assert_eq!(fid1, fid3); }
#[test]
#[should_panic]
fn reject_different_fingerprint() {
let (_temp, mut cconf) = mkconfig();
let (mut dbsp, (input_handle, _, _)) = mkcircuit(&cconf).unwrap();
let mut batch: Vec<Tup2<i32, Tup2<i32, i64>>> = vec![Tup2(1, Tup2(2, 1))];
input_handle.append(&mut batch);
let cpi = dbsp.checkpoint().run().expect("commit shouldn't fail");
drop(dbsp);
cconf.storage.as_mut().unwrap().init_checkpoint = Some(cpi.uuid);
let (dbsp_different, (_input_handle, _, _sample_size_handle)) =
mkcircuit_different(&cconf).unwrap();
drop(dbsp_different);
}
#[test]
#[allow(clippy::borrowed_box)]
fn test_z1_checkpointing() {
let (_temp, mut cconf) = mkconfig();
let expected_waterlines = vec![115, 115, 125, 145];
fn mkcircuit(
cconf: &CircuitConfig,
mut expected_waterline: vec::IntoIter<i32>,
) -> (DBSPHandle, ZSetHandle<i32>) {
Runtime::init_circuit(cconf, move |circuit| {
let (stream, handle) = circuit.add_input_zset();
stream
.waterline_monotonic(|| 0, |ts| ts + 5)
.inner_data()
.inspect(move |waterline: &Box<DynData>| {
if Runtime::worker_index() == 0 {
assert_eq!(
waterline.downcast_checked::<i32>(),
&expected_waterline.next().unwrap()
);
}
});
Ok(handle)
})
.unwrap()
}
let batches = vec![
vec![Tup2(100, 1), Tup2(110, 1), Tup2(50, 1)],
vec![Tup2(90, 1), Tup2(90, 1), Tup2(50, 1)],
vec![Tup2(110, 1), Tup2(120, 1), Tup2(100, 1)],
vec![Tup2(130, 1), Tup2(140, 1), Tup2(0, 1)],
];
for (idx, mut batch) in batches.into_iter().enumerate() {
let expected_waterlines = expected_waterlines.clone();
let expected_waterlines: Vec<i32> = expected_waterlines[idx..].into();
let (mut dbsp, input_handle) = mkcircuit(&cconf, expected_waterlines.into_iter());
input_handle.append(&mut batch);
dbsp.transaction().unwrap();
let cpm = dbsp.checkpoint().run().unwrap();
cconf.storage.as_mut().unwrap().init_checkpoint = Some(cpm.uuid);
dbsp.kill().unwrap();
}
}
}