use std::sync::Arc;
use graphrefly_core::{Core, HandleId, NodeId, OperatorOp, OperatorOpts, NO_HANDLE};
use crate::binding::OperatorBinding;
use crate::error::OperatorFactoryError;
use crate::transform::{filter, OperatorRegistration};
#[derive(Copy, Clone, Debug)]
#[must_use = "the flow operator's NodeId is the value of registering it"]
pub struct FlowRegistration {
pub node: NodeId,
}
impl FlowRegistration {
#[must_use]
pub fn into_node(self) -> NodeId {
self.node
}
}
impl From<FlowRegistration> for NodeId {
fn from(r: FlowRegistration) -> Self {
r.node
}
}
pub fn take(core: &Core, source: NodeId, count: u32) -> FlowRegistration {
take_with(core, source, count, OperatorOpts::default())
}
pub fn take_with(core: &Core, source: NodeId, count: u32, opts: OperatorOpts) -> FlowRegistration {
let node = core
.register_operator(&[source], OperatorOp::Take { count }, opts)
.expect(
"invariant: caller has validated dep ids and seed before calling register_operator",
);
FlowRegistration { node }
}
pub fn skip(core: &Core, source: NodeId, count: u32) -> FlowRegistration {
skip_with(core, source, count, OperatorOpts::default())
}
pub fn skip_with(core: &Core, source: NodeId, count: u32, opts: OperatorOpts) -> FlowRegistration {
let node = core
.register_operator(&[source], OperatorOp::Skip { count }, opts)
.expect(
"invariant: caller has validated dep ids and seed before calling register_operator",
);
FlowRegistration { node }
}
pub fn take_while<F>(
core: &Core,
binding: &Arc<dyn OperatorBinding>,
source: NodeId,
predicate: F,
) -> OperatorRegistration
where
F: Fn(HandleId) -> bool + Send + Sync + 'static,
{
take_while_with(core, binding, source, predicate, OperatorOpts::default())
}
pub fn take_while_with<F>(
core: &Core,
binding: &Arc<dyn OperatorBinding>,
source: NodeId,
predicate: F,
opts: OperatorOpts,
) -> OperatorRegistration
where
F: Fn(HandleId) -> bool + Send + Sync + 'static,
{
let fn_id = binding.register_predicate(Box::new(predicate));
let node = core
.register_operator(&[source], OperatorOp::TakeWhile { fn_id }, opts)
.expect(
"invariant: caller has validated dep ids and seed before calling register_operator",
);
OperatorRegistration { node, fn_id }
}
pub fn last(core: &Core, source: NodeId) -> FlowRegistration {
last_with(core, source, OperatorOpts::default())
}
pub fn last_with(core: &Core, source: NodeId, opts: OperatorOpts) -> FlowRegistration {
let node = core
.register_operator(&[source], OperatorOp::Last { default: NO_HANDLE }, opts)
.expect("invariant: caller has validated dep id before calling last()");
FlowRegistration { node }
}
pub fn last_with_default(
core: &Core,
source: NodeId,
default: HandleId,
) -> Result<FlowRegistration, OperatorFactoryError> {
last_with_default_with(core, source, default, OperatorOpts::default())
}
pub fn last_with_default_with(
core: &Core,
source: NodeId,
default: HandleId,
opts: OperatorOpts,
) -> Result<FlowRegistration, OperatorFactoryError> {
if default == NO_HANDLE {
return Err(OperatorFactoryError::ZeroDefault);
}
let node = core.register_operator(&[source], OperatorOp::Last { default }, opts)?;
Ok(FlowRegistration { node })
}
pub fn first(core: &Core, source: NodeId) -> FlowRegistration {
take(core, source, 1)
}
pub fn find<F>(
core: &Core,
binding: &Arc<dyn OperatorBinding>,
source: NodeId,
predicate: F,
) -> FlowRegistration
where
F: Fn(HandleId) -> bool + Send + Sync + 'static,
{
let filtered = filter(core, binding, source, predicate);
take(core, filtered.node, 1)
}
pub fn element_at(core: &Core, source: NodeId, index: u32) -> FlowRegistration {
let skipped = skip(core, source, index);
take(core, skipped.node, 1)
}