use smallvec::SmallVec;
use crate::handle::{FnId, HandleId, NodeId, NO_HANDLE};
use crate::node::SubscriptionId;
#[derive(Clone, Debug)]
pub struct DepBatch {
pub data: SmallVec<[HandleId; 1]>,
pub prev_data: HandleId,
pub involved: bool,
}
impl DepBatch {
#[must_use]
pub fn latest(&self) -> HandleId {
self.data.last().copied().unwrap_or(self.prev_data)
}
#[must_use]
pub fn is_sentinel(&self) -> bool {
self.prev_data == NO_HANDLE && self.data.is_empty()
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum CleanupTrigger {
OnRerun,
OnDeactivation,
OnInvalidate,
}
#[derive(Clone, Debug)]
#[must_use = "FnEmission may contain handles that must be processed or released"]
pub enum FnEmission {
Data(HandleId),
Complete,
Error(HandleId),
}
#[derive(Clone, Debug)]
pub enum FnResult {
Data {
handle: HandleId,
tracked: Option<Vec<usize>>,
},
Noop {
tracked: Option<Vec<usize>>,
},
Batch {
emissions: SmallVec<[FnEmission; 2]>,
tracked: Option<Vec<usize>>,
},
}
pub trait BindingBoundary: Send + Sync {
fn invoke_fn(&self, node_id: NodeId, fn_id: FnId, dep_data: &[DepBatch]) -> FnResult;
fn invoke_fn_with_core(
&self,
node_id: NodeId,
fn_id: FnId,
dep_data: &[DepBatch],
core: &dyn crate::node::CoreFull,
) -> FnResult {
let _ = core;
self.invoke_fn(node_id, fn_id, dep_data)
}
fn custom_equals(&self, equals_handle: FnId, a: HandleId, b: HandleId) -> bool;
fn release_handle(&self, handle: HandleId);
fn retain_handle(&self, _handle: HandleId) {}
fn project_each(&self, _fn_id: FnId, _inputs: &[HandleId]) -> SmallVec<[HandleId; 1]> {
unimplemented!("project_each: this binding does not support operators (D009)")
}
fn predicate_each(&self, _fn_id: FnId, _inputs: &[HandleId]) -> SmallVec<[bool; 4]> {
unimplemented!("predicate_each: this binding does not support operators (D009)")
}
fn fold_each(
&self,
_fn_id: FnId,
_acc: HandleId,
_inputs: &[HandleId],
) -> SmallVec<[HandleId; 1]> {
unimplemented!("fold_each: this binding does not support operators (D009)")
}
fn pairwise_pack(&self, _fn_id: FnId, _prev: HandleId, _current: HandleId) -> HandleId {
unimplemented!("pairwise_pack: this binding does not support operators (D009)")
}
fn pack_tuple(&self, _fn_id: FnId, _handles: &[HandleId]) -> HandleId {
unimplemented!("pack_tuple: this binding does not support combinator operators (D020)")
}
fn intern_node(&self, _node_id: NodeId) -> HandleId {
unimplemented!("intern_node: this binding does not support window operators")
}
fn producer_deactivate(&self, _node_id: NodeId, _unsub: &dyn Fn(NodeId, SubscriptionId)) {}
fn synthesize_pause_overflow_error(
&self,
_node_id: NodeId,
_dropped_count: u32,
_configured_max: usize,
_lock_held_duration_ms: u64,
) -> Option<HandleId> {
None
}
fn cleanup_for(&self, _node_id: NodeId, _trigger: CleanupTrigger) {}
fn serialize_handle(&self, _handle: HandleId) -> Option<serde_json::Value> {
None
}
fn deserialize_value(&self, _value: serde_json::Value) -> HandleId {
unimplemented!("deserialize_value: this binding does not support snapshot restore (D166)")
}
fn invoke_tap_fn(&self, _fn_id: FnId, _handle: HandleId) {
unimplemented!("invoke_tap_fn: this binding does not support control operators")
}
fn invoke_tap_error_fn(&self, _fn_id: FnId, _handle: HandleId) {
unimplemented!("invoke_tap_error_fn: this binding does not support control operators")
}
fn invoke_tap_complete_fn(&self, _fn_id: FnId) {
unimplemented!("invoke_tap_complete_fn: this binding does not support control operators")
}
#[allow(clippy::result_unit_err)]
fn invoke_rescue_fn(&self, _fn_id: FnId, _handle: HandleId) -> Result<HandleId, ()> {
unimplemented!("invoke_rescue_fn: this binding does not support control operators")
}
fn invoke_stratify_classifier_fn(
&self,
_fn_id: FnId,
_rules_handle: HandleId,
_value_handle: HandleId,
) -> bool {
unimplemented!("invoke_stratify_classifier_fn: this binding does not support the stratify operator (D199)")
}
fn wipe_ctx(&self, _node_id: NodeId) {}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::handle::{FnId, HandleId, NodeId};
use std::sync::atomic::{AtomicU64, Ordering};
#[allow(clippy::struct_field_names)]
struct TestBinding {
invoke_count: AtomicU64,
equals_count: AtomicU64,
release_count: AtomicU64,
}
impl TestBinding {
fn new() -> Self {
Self {
invoke_count: AtomicU64::new(0),
equals_count: AtomicU64::new(0),
release_count: AtomicU64::new(0),
}
}
}
impl BindingBoundary for TestBinding {
fn invoke_fn(&self, _node_id: NodeId, _fn_id: FnId, dep_data: &[DepBatch]) -> FnResult {
self.invoke_count.fetch_add(1, Ordering::SeqCst);
let handle = dep_data.first().map_or(HandleId::new(99), DepBatch::latest);
FnResult::Data {
handle,
tracked: None,
}
}
fn custom_equals(&self, _equals_handle: FnId, a: HandleId, b: HandleId) -> bool {
self.equals_count.fetch_add(1, Ordering::SeqCst);
a == b
}
fn release_handle(&self, _handle: HandleId) {
self.release_count.fetch_add(1, Ordering::SeqCst);
}
}
#[test]
fn boundary_calls_route_correctly() {
let b = TestBinding::new();
let dep = DepBatch {
data: smallvec::smallvec![HandleId::new(7)],
prev_data: NO_HANDLE,
involved: true,
};
let result = b.invoke_fn(NodeId::new(1), FnId::new(2), &[dep]);
match result {
FnResult::Data { handle, .. } => assert_eq!(handle, HandleId::new(7)),
FnResult::Noop { .. } | FnResult::Batch { .. } => panic!("expected Data variant"),
}
assert!(b.custom_equals(FnId::new(3), HandleId::new(7), HandleId::new(7)));
assert!(!b.custom_equals(FnId::new(3), HandleId::new(7), HandleId::new(8)));
b.release_handle(HandleId::new(7));
assert_eq!(b.invoke_count.load(Ordering::SeqCst), 1);
assert_eq!(b.equals_count.load(Ordering::SeqCst), 2);
assert_eq!(b.release_count.load(Ordering::SeqCst), 1);
}
#[test]
fn binding_is_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
fn assert_dyn_send_sync<T: ?Sized + Send + Sync>() {}
assert_send_sync::<TestBinding>();
assert_dyn_send_sync::<dyn BindingBoundary>();
}
}