use std::collections::{btree_map::Entry, BTreeMap};
use timely::dataflow::operators::probe;
use crate::{operators::arrange::TraceAgent, trace::TraceReader};
pub type ProbeHandle<T> = probe::Handle<T>;
pub type TraceHandle<Tr> = TraceAgent<Tr>;
pub struct RegisteredPlan<T, Tr>
where
T: timely::progress::Timestamp,
Tr: TraceReader,
{
pub probe: ProbeHandle<T>,
pub trace: TraceHandle<Tr>,
}
impl<T, Tr> std::fmt::Debug for RegisteredPlan<T, Tr>
where
T: timely::progress::Timestamp,
Tr: TraceReader,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let frontier_len = self.probe.with_frontier(|frontier| frontier.len());
f.debug_struct("RegisteredPlan")
.field("frontier_len", &frontier_len)
.finish_non_exhaustive()
}
}
impl<T, Tr> Clone for RegisteredPlan<T, Tr>
where
T: timely::progress::Timestamp,
Tr: TraceReader,
TraceHandle<Tr>: Clone,
{
fn clone(&self) -> Self {
Self {
probe: self.probe.clone(),
trace: self.trace.clone(),
}
}
}
pub trait BuildPlan<G, Tr>
where
G: timely::dataflow::Scope,
Tr: TraceReader<Time = G::Timestamp>,
{
fn build(self, scope: &mut G) -> RegisteredPlan<G::Timestamp, Tr>;
}
impl<G, Tr, F> BuildPlan<G, Tr> for F
where
G: timely::dataflow::Scope,
Tr: TraceReader<Time = G::Timestamp>,
F: FnOnce(&mut G) -> RegisteredPlan<G::Timestamp, Tr>,
{
fn build(self, scope: &mut G) -> RegisteredPlan<G::Timestamp, Tr> {
(self)(scope)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PlanAlreadyRegistered {
pub name: String,
}
impl std::fmt::Display for PlanAlreadyRegistered {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "build plan '{}' is already registered", self.name)
}
}
impl std::error::Error for PlanAlreadyRegistered {}
pub struct BuildPlanRegistry<T, Tr>
where
T: timely::progress::Timestamp,
Tr: TraceReader,
{
plans: BTreeMap<String, RegisteredPlan<T, Tr>>,
}
impl<T, Tr> Default for BuildPlanRegistry<T, Tr>
where
T: timely::progress::Timestamp,
Tr: TraceReader,
{
fn default() -> Self {
Self {
plans: BTreeMap::new(),
}
}
}
impl<T, Tr> std::fmt::Debug for BuildPlanRegistry<T, Tr>
where
T: timely::progress::Timestamp,
Tr: TraceReader,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BuildPlanRegistry")
.field("names", &self.plans.keys().collect::<Vec<_>>())
.finish()
}
}
impl<T, Tr> BuildPlanRegistry<T, Tr>
where
T: timely::progress::Timestamp,
Tr: TraceReader<Time = T>,
TraceHandle<Tr>: Clone,
{
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn register(
&mut self,
name: impl Into<String>,
plan: RegisteredPlan<T, Tr>,
) -> Result<RegisteredPlan<T, Tr>, PlanAlreadyRegistered> {
let name = name.into();
match self.plans.entry(name) {
Entry::Vacant(slot) => {
let snapshot = plan.clone();
slot.insert(plan);
Ok(snapshot)
}
Entry::Occupied(slot) => Err(PlanAlreadyRegistered {
name: slot.key().clone(),
}),
}
}
pub fn build_and_register<G, P>(
&mut self,
scope: &mut G,
name: impl Into<String>,
plan: P,
) -> Result<RegisteredPlan<T, Tr>, PlanAlreadyRegistered>
where
G: timely::dataflow::Scope<Timestamp = T>,
P: BuildPlan<G, Tr>,
{
let plan = plan.build(scope);
self.register(name, plan)
}
#[must_use]
pub fn get(&self, name: &str) -> Option<RegisteredPlan<T, Tr>> {
self.plans.get(name).cloned()
}
pub fn remove(&mut self, name: &str) -> Option<RegisteredPlan<T, Tr>> {
self.plans.remove(name)
}
pub fn replace(
&mut self,
name: impl Into<String>,
plan: RegisteredPlan<T, Tr>,
) -> RegisteredPlan<T, Tr> {
let name = name.into();
let snapshot = plan.clone();
self.plans.insert(name, plan);
snapshot
}
pub fn names(&self) -> impl ExactSizeIterator<Item = &str> {
self.plans.keys().map(String::as_str)
}
#[must_use]
pub fn len(&self) -> usize {
self.plans.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.plans.is_empty()
}
pub fn clear(&mut self) {
self.plans.clear();
}
}
#[cfg(test)]
mod tests {
use timely::dataflow::operators::Probe;
use super::{BuildPlanRegistry, PlanAlreadyRegistered, RegisteredPlan};
use crate::{
input::Input, operators::arrange::ArrangeBySelf, palimpsest::Lsn, trace::TraceReader,
};
type TestTrace = crate::trace::implementations::KeySpine<u64, Lsn, isize>;
fn build_plan<G>(scope: &mut G, seed: Vec<u64>) -> RegisteredPlan<Lsn, TestTrace>
where
G: timely::dataflow::Scope<Timestamp = Lsn>,
{
let collection = scope.new_collection_from(seed).1;
let arranged = collection.arrange_by_self();
let probe = arranged.stream.probe();
RegisteredPlan {
probe,
trace: arranged.trace,
}
}
#[test]
fn register_stores_handles_under_canonical_name() {
timely::execute_directly(|worker| {
let mut registry = BuildPlanRegistry::<Lsn, TestTrace>::new();
let plan = worker.dataflow::<Lsn, _, _>(|scope| build_plan(scope, vec![1_u64, 2, 3]));
let registered = registry
.register("all-posts", plan)
.expect("first register should succeed");
assert_eq!(registry.len(), 1);
assert_eq!(registry.names().collect::<Vec<_>>(), ["all-posts"]);
let frontier_len = registered.probe.with_frontier(|frontier| frontier.len());
assert_eq!(frontier_len, 1);
let mut clone = registered.trace.clone();
let _ = clone.get_logical_compaction();
});
}
#[test]
fn duplicate_name_returns_error() {
timely::execute_directly(|worker| {
let mut registry = BuildPlanRegistry::<Lsn, TestTrace>::new();
let first = worker.dataflow::<Lsn, _, _>(|scope| build_plan(scope, vec![1_u64]));
registry.register("q", first).unwrap();
let second = worker.dataflow::<Lsn, _, _>(|scope| build_plan(scope, vec![2_u64]));
let err = registry
.register("q", second)
.expect_err("duplicate registration must fail");
assert_eq!(err, PlanAlreadyRegistered { name: "q".into() });
});
}
#[test]
fn replace_overwrites_and_clear_drops_all_plans() {
timely::execute_directly(|worker| {
let mut registry = BuildPlanRegistry::<Lsn, TestTrace>::new();
let first = worker.dataflow::<Lsn, _, _>(|scope| build_plan(scope, vec![1_u64]));
registry.register("q", first).unwrap();
let replacement = worker.dataflow::<Lsn, _, _>(|scope| build_plan(scope, vec![5_u64]));
registry.replace("q", replacement);
assert_eq!(registry.len(), 1);
assert!(registry.get("q").is_some());
registry.clear();
assert!(registry.is_empty());
});
}
#[test]
fn build_and_register_accepts_a_closure() {
timely::execute_directly(|worker| {
let mut registry = BuildPlanRegistry::<Lsn, TestTrace>::new();
worker.dataflow::<Lsn, _, _>(|scope| {
registry
.build_and_register(scope, "via-closure", |scope: &mut _| {
build_plan(scope, vec![10_u64])
})
.expect("first register via closure");
});
assert!(registry.get("via-closure").is_some());
});
}
}