zrx_stream/stream/workspace/
workflow.rs

1// Copyright (c) 2025 Zensical and contributors
2
3// SPDX-License-Identifier: MIT
4// Third-party contributions licensed under DCO
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Workflow.
27
28use std::any::Any;
29use std::cell::RefCell;
30use std::marker::PhantomData;
31use std::rc::Rc;
32use zrx_scheduler::{Id, Value};
33
34use crate::stream::operator::Operator;
35use crate::stream::Stream;
36
37use super::traits::With;
38use super::WorkspaceRef;
39
40mod schedulable;
41
42pub use schedulable::Schedulable;
43
44// ----------------------------------------------------------------------------
45// Structs
46// ----------------------------------------------------------------------------
47
48/// Workflow.
49#[derive(Debug)]
50pub struct Workflow<I> {
51    /// Shared inner state.
52    inner: Rc<RefCell<WorkflowInner<I>>>,
53}
54
55/// Workflow inner state.
56#[derive(Debug)]
57pub struct WorkflowInner<I> {
58    /// Identifier.
59    id: usize,
60    /// Associated workspace.
61    workspace: WorkspaceRef<I>,
62}
63
64// ----------------------------------------------------------------------------
65// Implementations
66// ----------------------------------------------------------------------------
67
68impl<I> Workflow<I>
69where
70    I: std::fmt::Debug,
71{
72    /// Creates a new workflow.
73    #[must_use]
74    pub fn new(id: usize, workspace: WorkspaceRef<I>) -> Self {
75        Self {
76            inner: Rc::new(RefCell::new(WorkflowInner { id, workspace })),
77        }
78    }
79
80    /// Adds a source stream.
81    #[allow(clippy::missing_panics_doc)]
82    #[must_use]
83    pub fn add_source<T>(&self) -> Stream<I, T>
84    where
85        T: Any,
86    {
87        let id = self.with(|workflow| {
88            let workspace = workflow.workspace.upgrade().expect("invariant");
89            workspace.add_source::<T>()
90        });
91        Stream {
92            id,
93            workflow: self.clone(),
94            marker: PhantomData,
95        }
96    }
97
98    /// Adds an operator to the workflow.
99    #[allow(clippy::missing_panics_doc)]
100    pub fn add_operator<S, O, T, U>(&self, from: S, operator: O) -> Stream<I, U>
101    where
102        I: Id,
103        S: IntoIterator<Item = usize>,
104        O: Operator<I, T> + 'static,
105        T: Value,
106        U: Value,
107    {
108        let id = self.with(|workflow| {
109            let workspace = workflow.workspace.upgrade().expect("invariant");
110            workspace.add_action::<U, _, _>(from, Schedulable::new(operator))
111        });
112        Stream {
113            id,
114            workflow: self.clone(),
115            marker: PhantomData,
116        }
117    }
118}
119
120#[allow(clippy::must_use_candidate)]
121impl<I> Workflow<I> {
122    /// Returns the identifier of the workflow.
123    pub fn id(&self) -> usize {
124        self.with(|workflow| workflow.id)
125    }
126}
127
128// ----------------------------------------------------------------------------
129// Trait implementations
130// ----------------------------------------------------------------------------
131
132impl<I> With for Workflow<I> {
133    type Item = WorkflowInner<I>;
134
135    /// Returns a reference to the inner state.
136    #[inline]
137    fn inner(&self) -> &RefCell<Self::Item> {
138        &self.inner
139    }
140}
141
142// ----------------------------------------------------------------------------
143
144impl<I> Clone for Workflow<I> {
145    /// Returns a copy of the workflow.
146    #[inline]
147    fn clone(&self) -> Self {
148        Self { inner: self.inner.clone() }
149    }
150}