uni_plugin/traits/algorithm.rs
1//! Graph algorithm plugins.
2//!
3//! Two surfaces: [`AlgorithmProvider`] for black-box algorithms (the
4//! existing `uni-algo` library style), and [`PregelProgramProvider`] for
5//! vertex-program-style algorithms the host's Pregel executor runs.
6
7use arrow_array::ArrayRef;
8use arrow_schema::DataType;
9use datafusion::execution::SendableRecordBatchStream;
10use smol_str::SmolStr;
11
12use crate::errors::FnError;
13
14/// Static signature of an algorithm.
15#[derive(Clone, Debug)]
16pub struct AlgorithmSignature {
17 /// Output column schema.
18 pub output_fields: Vec<arrow_schema::Field>,
19 /// Markdown docs.
20 pub docs: String,
21}
22
23/// Per-invocation context passed to an [`AlgorithmProvider`].
24///
25/// `host` is an opaque [`AlgorithmHost`] callback the host populates
26/// when invoking the algorithm. Algorithms that need a concrete
27/// graph-projection / storage handle downcast through `host` rather
28/// than depend on `uni-store` / `uni-algo` types directly — this keeps
29/// `uni-plugin` free of upward dependencies.
30#[non_exhaustive]
31pub struct AlgorithmContext<'a> {
32 /// JSON-serialized algorithm configuration.
33 pub config_json: &'a str,
34 /// Optional opaque host handle. `None` when no host is bound — the
35 /// algorithm may fall back to a config-only path or surface an
36 /// `Unbound` error.
37 pub host: Option<&'a dyn AlgorithmHost>,
38}
39
40impl std::fmt::Debug for AlgorithmContext<'_> {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 f.debug_struct("AlgorithmContext")
43 .field("config_json", &self.config_json)
44 .field("host_bound", &self.host.is_some())
45 .finish()
46 }
47}
48
49impl<'a> AlgorithmContext<'a> {
50 /// Construct an `AlgorithmContext` with no host bound.
51 #[must_use]
52 pub fn new(config_json: &'a str) -> Self {
53 Self {
54 config_json,
55 host: None,
56 }
57 }
58
59 /// Attach a host handle.
60 #[must_use]
61 pub fn with_host(mut self, host: &'a dyn AlgorithmHost) -> Self {
62 self.host = Some(host);
63 self
64 }
65}
66
67/// Opaque host callback surfacing graph access to plugin algorithms.
68///
69/// Hosts implement this trait; bridges (e.g. `uni-plugin-builtin`)
70/// downcast via [`AlgorithmHost::as_any`] to recover the concrete host
71/// type and its `StorageManager` / `L0Manager` handles. Keeps
72/// `uni-plugin` free of upward dependencies on `uni-store` / `uni-algo`.
73pub trait AlgorithmHost: Send + Sync {
74 /// Downcast hook — bridges implement this to expose the concrete
75 /// host type.
76 fn as_any(&self) -> &dyn std::any::Any;
77}
78
79/// A black-box graph algorithm.
80///
81/// The trait is intentionally minimal: a signature describing the output,
82/// plus a `run` method returning a streaming `RecordBatch` sequence. The
83/// algorithm is responsible for fetching graph data via host APIs (out of
84/// scope of this trait — `uni-algo` will provide a `GraphView` abstraction
85/// the host adapter passes via `AlgorithmContext` once those APIs are
86/// available).
87pub trait AlgorithmProvider: Send + Sync {
88 /// Static signature.
89 fn signature(&self) -> &AlgorithmSignature;
90
91 /// Execute the algorithm.
92 ///
93 /// # Errors
94 ///
95 /// Returns [`FnError`] if the algorithm cannot be started; per-batch
96 /// failures are signaled via `Err` items in the returned stream.
97 fn run(&self, ctx: AlgorithmContext<'_>) -> Result<SendableRecordBatchStream, FnError>;
98}
99
100/// Signature of a Pregel-style vertex program.
101#[derive(Clone, Debug)]
102pub struct PregelSignature {
103 /// Per-vertex state column type.
104 pub state_type: DataType,
105 /// Message column type.
106 pub message_type: DataType,
107 /// Synchronization model.
108 pub aggregation_mode: AggregationMode,
109 /// Optional hard cap on supersteps.
110 pub max_supersteps: Option<u64>,
111}
112
113/// Synchronization model for Pregel programs.
114#[derive(Clone, Copy, Debug, PartialEq, Eq)]
115#[non_exhaustive]
116pub enum AggregationMode {
117 /// Bulk Synchronous Parallel — classic Pregel.
118 Bsp,
119 /// Asynchronous with shared state.
120 AsyncShared,
121 /// Asynchronous via point-to-point messaging.
122 AsyncMessaging,
123}
124
125/// Outcome of a vertex's `compute` step.
126#[derive(Debug)]
127pub struct ComputeOutcome {
128 /// Whether this vertex votes to halt.
129 pub halt: bool,
130 /// Outgoing messages, addressed to neighbor vertices.
131 pub outgoing: Vec<(SmolStr, ArrayRef)>,
132}
133
134/// Statistics surfaced to the Pregel host between supersteps.
135#[derive(Clone, Copy, Debug, Default)]
136pub struct PregelStats {
137 /// Active vertices in the current superstep.
138 pub active_vertices: u64,
139 /// Messages sent in the previous superstep.
140 pub messages_sent: u64,
141 /// Wall-clock duration of the previous superstep, milliseconds.
142 pub last_superstep_ms: u64,
143}
144
145/// A Pregel-style vertex program plugin.
146///
147/// Detailed `init` / `compute` / `combine` signatures will land alongside
148/// the Pregel executor in `uni-algo` during M5c. The trait is in place so
149/// plugin authors can build against the surface from M1; full integration
150/// follows.
151pub trait PregelProgramProvider: Send + Sync {
152 /// Static signature.
153 fn signature(&self) -> &PregelSignature;
154
155 /// Optional global halt condition consulted between supersteps.
156 fn halt(&self, _superstep: u64, _stats: &PregelStats) -> bool {
157 false
158 }
159}