Skip to main content

uni_plugin/traits/
algorithm.rs

1//! Graph algorithm plugins.
2//!
3//! Two surfaces: [`AlgorithmProvider`] for black-box algorithms (the
4//! existing `uni-algo` library style), and [`PregelProgramProvider`] for
5//! vertex-program-style algorithms the host's Pregel executor runs.
6
7use arrow_array::ArrayRef;
8use arrow_schema::DataType;
9use datafusion::execution::SendableRecordBatchStream;
10use smol_str::SmolStr;
11
12use crate::errors::FnError;
13
14/// Static signature of an algorithm.
15#[derive(Clone, Debug)]
16pub struct AlgorithmSignature {
17    /// Output column schema.
18    pub output_fields: Vec<arrow_schema::Field>,
19    /// Markdown docs.
20    pub docs: String,
21}
22
23/// Per-invocation context passed to an [`AlgorithmProvider`].
24///
25/// `host` is an opaque [`AlgorithmHost`] callback the host populates
26/// when invoking the algorithm. Algorithms that need a concrete
27/// graph-projection / storage handle downcast through `host` rather
28/// than depend on `uni-store` / `uni-algo` types directly — this keeps
29/// `uni-plugin` free of upward dependencies.
30#[non_exhaustive]
31pub struct AlgorithmContext<'a> {
32    /// JSON-serialized algorithm configuration.
33    pub config_json: &'a str,
34    /// Optional opaque host handle. `None` when no host is bound — the
35    /// algorithm may fall back to a config-only path or surface an
36    /// `Unbound` error.
37    pub host: Option<&'a dyn AlgorithmHost>,
38}
39
40impl std::fmt::Debug for AlgorithmContext<'_> {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        f.debug_struct("AlgorithmContext")
43            .field("config_json", &self.config_json)
44            .field("host_bound", &self.host.is_some())
45            .finish()
46    }
47}
48
49impl<'a> AlgorithmContext<'a> {
50    /// Construct an `AlgorithmContext` with no host bound.
51    #[must_use]
52    pub fn new(config_json: &'a str) -> Self {
53        Self {
54            config_json,
55            host: None,
56        }
57    }
58
59    /// Attach a host handle.
60    #[must_use]
61    pub fn with_host(mut self, host: &'a dyn AlgorithmHost) -> Self {
62        self.host = Some(host);
63        self
64    }
65}
66
67/// Opaque host callback surfacing graph access to plugin algorithms.
68///
69/// Hosts implement this trait; bridges (e.g. `uni-plugin-builtin`)
70/// downcast via [`AlgorithmHost::as_any`] to recover the concrete host
71/// type and its `StorageManager` / `L0Manager` handles. Keeps
72/// `uni-plugin` free of upward dependencies on `uni-store` / `uni-algo`.
73pub trait AlgorithmHost: Send + Sync {
74    /// Downcast hook — bridges implement this to expose the concrete
75    /// host type.
76    fn as_any(&self) -> &dyn std::any::Any;
77}
78
79/// A black-box graph algorithm.
80///
81/// The trait is intentionally minimal: a signature describing the output,
82/// plus a `run` method returning a streaming `RecordBatch` sequence. The
83/// algorithm is responsible for fetching graph data via host APIs (out of
84/// scope of this trait — `uni-algo` will provide a `GraphView` abstraction
85/// the host adapter passes via `AlgorithmContext` once those APIs are
86/// available).
87pub trait AlgorithmProvider: Send + Sync {
88    /// Static signature.
89    fn signature(&self) -> &AlgorithmSignature;
90
91    /// Execute the algorithm.
92    ///
93    /// # Errors
94    ///
95    /// Returns [`FnError`] if the algorithm cannot be started; per-batch
96    /// failures are signaled via `Err` items in the returned stream.
97    fn run(&self, ctx: AlgorithmContext<'_>) -> Result<SendableRecordBatchStream, FnError>;
98}
99
100/// Signature of a Pregel-style vertex program.
101#[derive(Clone, Debug)]
102pub struct PregelSignature {
103    /// Per-vertex state column type.
104    pub state_type: DataType,
105    /// Message column type.
106    pub message_type: DataType,
107    /// Synchronization model.
108    pub aggregation_mode: AggregationMode,
109    /// Optional hard cap on supersteps.
110    pub max_supersteps: Option<u64>,
111}
112
113/// Synchronization model for Pregel programs.
114#[derive(Clone, Copy, Debug, PartialEq, Eq)]
115#[non_exhaustive]
116pub enum AggregationMode {
117    /// Bulk Synchronous Parallel — classic Pregel.
118    Bsp,
119    /// Asynchronous with shared state.
120    AsyncShared,
121    /// Asynchronous via point-to-point messaging.
122    AsyncMessaging,
123}
124
125/// Outcome of a vertex's `compute` step.
126#[derive(Debug)]
127pub struct ComputeOutcome {
128    /// Whether this vertex votes to halt.
129    pub halt: bool,
130    /// Outgoing messages, addressed to neighbor vertices.
131    pub outgoing: Vec<(SmolStr, ArrayRef)>,
132}
133
134/// Statistics surfaced to the Pregel host between supersteps.
135#[derive(Clone, Copy, Debug, Default)]
136pub struct PregelStats {
137    /// Active vertices in the current superstep.
138    pub active_vertices: u64,
139    /// Messages sent in the previous superstep.
140    pub messages_sent: u64,
141    /// Wall-clock duration of the previous superstep, milliseconds.
142    pub last_superstep_ms: u64,
143}
144
145/// A Pregel-style vertex program plugin.
146///
147/// Detailed `init` / `compute` / `combine` signatures will land alongside
148/// the Pregel executor in `uni-algo` during M5c. The trait is in place so
149/// plugin authors can build against the surface from M1; full integration
150/// follows.
151pub trait PregelProgramProvider: Send + Sync {
152    /// Static signature.
153    fn signature(&self) -> &PregelSignature;
154
155    /// Optional global halt condition consulted between supersteps.
156    fn halt(&self, _superstep: u64, _stats: &PregelStats) -> bool {
157        false
158    }
159}