1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
//! Remote operation traits.
//!
//! This module defines the core traits that enable users to define the
//! semantics of their system. Operations define the signature and semantics of
//! a computation. Akin to a function that maps an input to an output.
//!
//! Key components of this module include:
//! ## [`Operation`]
//! Represents a generic operation that can be executed by a remote machine.
//! It defines the signature and semantics of a computation.
//!
//! ## [`Monoid`]
//! Represents a binary [`Operation`] whose elements can be  combined in an
//! associative manner. It's a specialized form of an [`Operation`] that has an
//! identity element and an associative binary operation. This makes it
//! _foldable_.
//!
//! ## [`Operation`] implementation of [`Monoid`]
//! An automatic implementation of the [`Operation`] trait for [`Monoid`]s trait
//! is provided. An [`Operation`] is strictly more general than a [`Monoid`], so
//! we can trivially derive an [`Operation`] for every [`Monoid`].
//!
//! ## [`OpKind`]
//! A trait used to create a registry of all available operations.It's used to
//! facilitate serialization, deserialization, and remote dynamic execution of
//! operations.
//!
//! # Notes
//!
//! - Operations are monomorphic over their implementing type. This is
//!   illustrated by the fact that the `Input` and `Output` types are
//!   _associated_ types of the [`Operation`] trait. This obviates the need to
//!   make [`OpKind`] enums generic and thus simplifies downstream code by
//!   removing the need for threading generic parameters through the system. It
//!   is, however, possible to implement [`Operation`]s for generic types. see
//!   the [example](#defining-a-polymorphic-monoid). They just must be
//!   monomorphized in the enum definition. It is not yet clear that
//!   polymorphism at the level of [`OpKind`] needed. This may change as the
//!   system sees more use, in which case we can revisit this design decision.
//!
//! # Usage:
//! Operations are the semantic building blocks of the system. They define what
//! computations can be performed by the system. By implementing the
//! [`Operation`] trait for a type, it can be remotely executed. Operations are
//! not necessarily meant to be executed directly, but rather embedded into
//! [`Task`](crate::task::Task)s, which contain operation arguments, additional
//! metadata, and routing information to facilitate remote execution.
//!
//! ## Example
//! ### Defining an [`Operation`]:
//!
//! ```
//! use paladin::{operation::{Operation, Result}, opkind_derive::OpKind};
//! use serde::{Deserialize, Serialize};
//!
//! #[derive(Serialize, Deserialize, Debug, Clone, Copy)]
//! struct StringLength;
//!
//! impl Operation for StringLength {
//!     type Input = String;
//!     type Output = usize;
//!     type Kind = MyOps;
//!     
//!     fn execute(&self, input: Self::Input) -> Result<Self::Output> {
//!         Ok(input.len())
//!     }
//! }
//!
//! #[derive(OpKind, Serialize, Deserialize, Debug, Clone, Copy)]
//! enum MyOps {
//!    StringLength(StringLength),
//! }
//! ```
//!
//! ### Defining a [`Monoid`]:
//!
//! ```
//! use paladin::{operation::{Monoid, Operation, Result}, opkind_derive::OpKind};
//! use serde::{Deserialize, Serialize};
//!
//! #[derive(Serialize, Deserialize, Debug, Clone, Copy)]
//! struct StringConcat;
//!
//! impl Monoid for StringConcat {
//!     type Elem = String;
//!     type Kind = MyOps;
//!     
//!     fn combine(&self, a: Self::Elem, b: Self::Elem) -> Result<Self::Elem> {
//!         Ok(a + &b)
//!     }
//!     
//!     fn empty(&self) -> Self::Elem {
//!         String::new()
//!     }
//! }
//!
//! #[derive(OpKind, Serialize, Deserialize, Debug, Clone, Copy)]
//! enum MyOps {
//!    StringConcat(StringConcat),
//! }
//! ```
//!
//! ### An [`Operation`] with constructor arguments:
//!
//! ```
//! use paladin::{operation::{Monoid, Operation, Result}, opkind_derive::OpKind};
//! use serde::{Deserialize, Serialize};
//!
//! #[derive(Serialize, Deserialize, Debug, Clone, Copy)]
//! struct MultiplyBy(i32);
//!
//! impl Operation for MultiplyBy {
//!     type Input = i32;
//!     type Output = i32;
//!     type Kind = MyOps;
//!     
//!     fn execute(&self, input: Self::Input) -> Result<Self::Output> {
//!         Ok(self.0 * input)
//!     }
//! }
//!
//! #[derive(OpKind, Serialize, Deserialize, Debug, Clone, Copy)]
//! enum MyOps {
//!     MultiplyBy(MultiplyBy),
//! }
//! ```
//!
//! ### Defining a polymorphic [`Monoid`]:
//!
//! ```
//! use paladin::{
//!     operation::{Monoid, Operation, Result},
//!     opkind_derive::OpKind,
//!     serializer::Serializable,
//! };
//! use serde::{Deserialize, Serialize};
//! use std::{ops::Mul, fmt::Debug};
//! use num_traits::One;
//!
//! #[derive(Serialize, Deserialize, Debug, Clone, Copy, Default)]
//! struct GenericMultiplication<T>(std::marker::PhantomData<T>);
//!
//! impl<T: Mul<Output = T> + One + Serializable + Debug + Clone> Monoid for GenericMultiplication<T>
//! where
//!     MyOps: From<GenericMultiplication<T>>,
//! {
//!     type Elem = T;
//!     type Kind = MyOps;
//!
//!     fn empty(&self) -> Self::Elem {
//!         T::one()
//!     }
//!
//!     fn combine(&self, a: Self::Elem, b: Self::Elem) -> Result<Self::Elem> {
//!         Ok(a * b)
//!     }
//! }
//!
//! #[derive(OpKind, Serialize, Deserialize, Debug, Clone, Copy)]
//! enum MyOps {
//!     // Monomorphize
//!     GenericMultiplicationI32(GenericMultiplication<i32>),
//!     GenericMultiplicationI64(GenericMultiplication<i64>),
//! }
//! ```
//!
//! Later on ...
//! ```
//! # use paladin::{
//! #    operation::{Monoid, Operation, Result},
//! #    opkind_derive::OpKind,
//! #    serializer::Serializable,
//! # };
//! # use serde::{Deserialize, Serialize};
//! # use std::{ops::Mul, fmt::Debug};
//! # use num_traits::One;
//! #
//! # #[derive(Serialize, Deserialize, Debug, Clone, Copy, Default)]
//! # struct GenericMultiplication<T>(std::marker::PhantomData<T>);
//! #
//! # impl<T: Mul<Output = T> + One + Serializable + Debug + Clone> Monoid for GenericMultiplication<T>
//! # where
//! #    MyOps: From<GenericMultiplication<T>>,
//! # {
//! #    type Elem = T;
//! #    type Kind = MyOps;
//! #
//! #    fn empty(&self) -> Self::Elem {
//! #        T::one()
//! #    }
//! #
//! #    fn combine(&self, a: Self::Elem, b: Self::Elem) -> Result<Self::Elem> {
//! #        Ok(a * b)
//! #    }
//! # }
//! #
//! # #[derive(OpKind, Serialize, Deserialize, Debug, Clone, Copy)]
//! # enum MyOps {
//! #    // Monomorphize
//! #    GenericMultiplicationI32(GenericMultiplication<i32>),
//! #    GenericMultiplicationI64(GenericMultiplication<i64>),
//! # }
//! #
//! use paladin::{
//!     directive::{IndexedStream, Directive},
//! };
//!
//! #[tokio::main]
//! async fn main() {
//!     let computation = IndexedStream::from([1, 2, 3, 4, 5, 6])
//!         .fold(GenericMultiplication::<i32>::default());
//! }
//! ```

use std::fmt::Debug;

use crate::serializer::{Serializable, Serializer};

/// An operation that can be performed by a worker.
///
/// Akin to a function that maps an input to an output, it defines the signature
/// and semantics of a computation.
pub trait Operation: Serializable + Clone + Debug + Into<Self::Kind> {
    /// The input type of the operation.
    type Input: Serializable + Debug;
    /// The output type of the operation.
    type Output: Serializable + Debug;
    /// The operation registry type.
    type Kind: OpKind;

    /// Execute the operation on the given input.
    fn execute(&self, input: Self::Input) -> Result<Self::Output>;

    /// Get the input from a byte representation.
    fn input_from_bytes(&self, serializer: Serializer, input: &[u8]) -> Result<Self::Input> {
        Ok(serializer
            .from_bytes(input)
            .map_err(|err| FatalError::from_anyhow(err, Default::default()))?)
    }

    /// Get a byte representation of the output.
    fn output_to_bytes(&self, serializer: Serializer, output: Self::Output) -> Result<Vec<u8>> {
        Ok(serializer
            .to_bytes(&output)
            .map_err(|err| FatalError::from_anyhow(err, Default::default()))?)
    }

    /// Execute the operation on the given input as bytes.
    fn execute_as_bytes(&self, serializer: Serializer, input: &[u8]) -> Result<Vec<u8>> {
        self.input_from_bytes(serializer, input)
            .and_then(|input| self.execute(input))
            .and_then(|output| self.output_to_bytes(serializer, output))
    }

    /// Get a byte representation of the operation.
    fn as_bytes(&self, serializer: Serializer) -> Result<Vec<u8>> {
        let output = serializer
            .to_bytes(self)
            .map_err(|err| FatalError::from_anyhow(err, Default::default()))?;
        Ok(output)
    }
}

/// A registry of all available operations.
///
/// Implementations MUST be an enum. The implementation should group all
/// available [`Operation`]s into a single registry. This enables operations to
/// be serialized and executed by a remote service in an opaque manner. In
/// particular, the remote service need not know the exact type of operation,
/// but rather, only the _kind_ of possible operations.
///
/// Note that a [`Runtime`](crate::runtime::Runtime) instance expects to be
/// specialized with an [`OpKind`] type -- this is what enables dynamic
/// execution behavior across all available operations.
///
/// # Design rationale
/// In a world where operations are all executed in the same process, `Box<dyn
/// Operation>` would theoretically suffice. However, in a distributed system,
/// we need to serialize operations and send them back and forth between remote
/// machines. Generic trait object serialization / deserialization is
/// non-trivial and is not supported by [`serde`] out of the box. Solutions
/// like <https://docs.rs/typetag> were explored, but, unfortunately, they do
/// not support generic types.
///
/// A `proc_macro_derive` is provided to simplify the process of facilitating
/// opaque execution of operations, [`crate::opkind_derive`]. It is highly
/// recommended to use this derive macro to implement [`OpKind`] for your
/// operation registry, as there are a lot of boilerplate details that need to
/// be taken care of.
pub trait OpKind: Serializable + Clone + Debug {}

/// An associative binary [`Operation`].
pub trait Monoid: Serializable + Clone + Debug + Into<Self::Kind> {
    /// The type of the elements that can be combined by the operation.
    /// Note that unlike an [`Operation`], a [`Monoid`] is a binary operation on
    /// elements of the same type. As such, it does not have distinct
    /// `Input` and `Output` types, but rather, a single `Elem` type.
    type Elem: Serializable + Debug;

    /// The operation registry type.
    type Kind: OpKind;

    /// Get the identity element of the operation. Practically, this will be
    /// used when attempting to fold over a collection of elements, and that
    /// collection is empty.
    fn empty(&self) -> Self::Elem;

    /// Combine two elements using the operation.
    fn combine(&self, a: Self::Elem, b: Self::Elem) -> Result<Self::Elem>;
}

/// Implement the [`Operation`] trait for types that support the binary
/// operation
impl<T> Operation for T
where
    T: Monoid,
{
    /// The input type is a tuple of two elements of the same type.
    type Input = (T::Elem, T::Elem);
    /// The output type is a single, combined, element of the same type.
    type Output = T::Elem;

    type Kind = T::Kind;

    /// Execute the operation on the given input.
    fn execute(&self, (a, b): Self::Input) -> Result<Self::Output> {
        self.combine(a, b)
    }
}

mod error;
pub use error::*;