zrx_stream/stream/workspace/workflow.rs
1// Copyright (c) 2025 Zensical and contributors
2
3// SPDX-License-Identifier: MIT
4// Third-party contributions licensed under DCO
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Workflow.
27
28use std::any::Any;
29use std::cell::RefCell;
30use std::marker::PhantomData;
31use std::rc::Rc;
32use zrx_scheduler::{Id, Value};
33
34use crate::stream::operator::Operator;
35use crate::stream::Stream;
36
37use super::traits::With;
38use super::WorkspaceRef;
39
40mod schedulable;
41
42pub use schedulable::Schedulable;
43
44// ----------------------------------------------------------------------------
45// Structs
46// ----------------------------------------------------------------------------
47
48/// Workflow.
49#[derive(Debug)]
50pub struct Workflow<I> {
51 /// Shared inner state.
52 inner: Rc<RefCell<WorkflowInner<I>>>,
53}
54
55/// Workflow inner state.
56#[derive(Debug)]
57pub struct WorkflowInner<I> {
58 /// Identifier.
59 id: usize,
60 /// Associated workspace.
61 workspace: WorkspaceRef<I>,
62}
63
64// ----------------------------------------------------------------------------
65// Implementations
66// ----------------------------------------------------------------------------
67
68impl<I> Workflow<I>
69where
70 I: std::fmt::Debug,
71{
72 /// Creates a new workflow.
73 #[must_use]
74 pub fn new(id: usize, workspace: WorkspaceRef<I>) -> Self {
75 Self {
76 inner: Rc::new(RefCell::new(WorkflowInner { id, workspace })),
77 }
78 }
79
80 /// Adds a source stream.
81 #[allow(clippy::missing_panics_doc)]
82 #[must_use]
83 pub fn add_source<T>(&self) -> Stream<I, T>
84 where
85 T: Any,
86 {
87 let id = self.with(|workflow| {
88 let workspace = workflow.workspace.upgrade().expect("invariant");
89 workspace.add_source::<T>()
90 });
91 Stream {
92 id,
93 workflow: self.clone(),
94 marker: PhantomData,
95 }
96 }
97
98 /// Adds an operator to the workflow.
99 #[allow(clippy::missing_panics_doc)]
100 pub fn add_operator<S, O, T, U>(&self, from: S, operator: O) -> Stream<I, U>
101 where
102 I: Id,
103 S: IntoIterator<Item = usize>,
104 O: Operator<I, T> + 'static,
105 T: Value,
106 U: Value,
107 {
108 let id = self.with(|workflow| {
109 let workspace = workflow.workspace.upgrade().expect("invariant");
110 workspace.add_action::<U, _, _>(from, Schedulable::new(operator))
111 });
112 Stream {
113 id,
114 workflow: self.clone(),
115 marker: PhantomData,
116 }
117 }
118}
119
120#[allow(clippy::must_use_candidate)]
121impl<I> Workflow<I> {
122 /// Returns the identifier of the workflow.
123 pub fn id(&self) -> usize {
124 self.with(|workflow| workflow.id)
125 }
126}
127
128// ----------------------------------------------------------------------------
129// Trait implementations
130// ----------------------------------------------------------------------------
131
132impl<I> With for Workflow<I> {
133 type Item = WorkflowInner<I>;
134
135 /// Returns a reference to the inner state.
136 #[inline]
137 fn inner(&self) -> &RefCell<Self::Item> {
138 &self.inner
139 }
140}
141
142// ----------------------------------------------------------------------------
143
144impl<I> Clone for Workflow<I> {
145 /// Returns a copy of the workflow.
146 #[inline]
147 fn clone(&self) -> Self {
148 Self { inner: self.inner.clone() }
149 }
150}