zrx_stream/stream/workflow/builder.rs
1// Copyright (c) 2025-2026 Zensical and contributors
2
3// SPDX-License-Identifier: MIT
4// All contributions are certified under the DCO
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Workflow builder.
27
28use std::error::Error;
29use std::marker::PhantomData;
30
31use zrx_scheduler::schedule::{self, Shared};
32use zrx_scheduler::{Id, Value};
33
34use crate::stream::Stream;
35
36use super::Workflow;
37
38// ----------------------------------------------------------------------------
39// Structs
40// ----------------------------------------------------------------------------
41
42/// Workflow builder.
43#[derive(Debug, PartialEq, Eq)]
44pub struct Builder<I> {
45 /// Schedule builder.
46 schedule: Shared<schedule::Builder<I>>,
47}
48
49// ----------------------------------------------------------------------------
50// Implementations
51// ----------------------------------------------------------------------------
52
53impl<I> Workflow<I>
54where
55 I: Id,
56{
57 /// Creates a workflow builder.
58 ///
59 /// If possible, it's recommended to use [`Workflow::with`] instead, as it
60 /// creates a [`Builder`] which is only valid for the function lifetime.
61 #[inline]
62 #[must_use]
63 pub fn builder() -> Builder<I> {
64 Builder::default()
65 }
66
67 /// Creates a workflow from the builder passed to the given function.
68 ///
69 /// # Errors
70 ///
71 /// If the given function returns an error, it is returned.
72 #[allow(clippy::missing_panics_doc)]
73 pub fn with<F, E>(f: F) -> Result<Self, E>
74 where
75 F: FnOnce(Builder<I>) -> Result<(), E>,
76 E: Error,
77 {
78 let builder = Builder::default();
79 f(builder.clone()).map(|()| {
80 let Builder { schedule } = builder;
81 let builder = schedule.try_into_inner().expect("invariant");
82 // We can safely use expect here, since the builder can't be cloned
83 // in the function, and is thus guaranteed to be the only reference
84 Self { schedule: builder.build() }
85 })
86 }
87}
88
89// ----------------------------------------------------------------------------
90
91impl<I> Builder<I>
92where
93 I: Id,
94{
95 /// Adds a stream to the workflow.
96 #[inline]
97 pub fn add<T>(&self) -> Stream<I, T>
98 where
99 T: Value,
100 {
101 Stream {
102 id: self.schedule.with_mut(schedule::Builder::add_source::<T>),
103 workflow: self.clone(),
104 marker: PhantomData,
105 }
106 }
107
108 /// Attempts to unwrap the inner value.
109 ///
110 /// This method tries to unwrap the inner schedule builder, which will only
111 /// succeed if there's exactly one strong reference remaining.
112 ///
113 /// # Errors
114 ///
115 /// This method returns `Self` if there's more than one strong reference.
116 pub fn build(self) -> Result<Workflow<I>, Self> {
117 self.schedule
118 .try_into_inner()
119 .map(|builder| Workflow { schedule: builder.build() })
120 .map_err(|schedule| Self { schedule })
121 }
122
123 /// Mutably borrows the schedule builder.
124 pub(crate) fn with<F, R>(&self, f: F) -> R
125 where
126 F: FnOnce(&mut schedule::Builder<I>) -> R,
127 {
128 self.schedule.with_mut(f)
129 }
130}
131
132impl<I> Builder<I> {
133 /// Clones the builder.
134 pub(crate) fn clone(&self) -> Self {
135 Self {
136 schedule: self.schedule.clone(),
137 }
138 }
139}
140
141// ----------------------------------------------------------------------------
142// Trait implementations
143// ----------------------------------------------------------------------------
144
145impl<I> Default for Builder<I> {
146 /// Creates a workflow builder.
147 #[inline]
148 fn default() -> Self {
149 Self { schedule: Shared::default() }
150 }
151}