Skip to main content

fission_core/
async_runtime.rs

1use crate::action::ActionEnvelope;
2use serde::{de::DeserializeOwned, Deserialize, Serialize};
3use std::borrow::Cow;
4use std::future::Future;
5use std::marker::PhantomData;
6use std::pin::Pin;
7use std::sync::Arc;
8
9pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
10
11pub trait JobSpec {
12    type Request: Serialize + DeserializeOwned + Send + 'static;
13    type Ok: Serialize + DeserializeOwned + Send + 'static;
14    type Err: Serialize + DeserializeOwned + Send + 'static;
15    const NAME: &'static str;
16}
17
18#[derive(Debug)]
19pub struct JobRef<J: JobSpec> {
20    pub name: &'static str,
21    _marker: PhantomData<fn() -> J>,
22}
23
24impl<J: JobSpec> JobRef<J> {
25    pub const fn new(name: &'static str) -> Self {
26        Self {
27            name,
28            _marker: PhantomData,
29        }
30    }
31}
32
33impl<J: JobSpec> Clone for JobRef<J> {
34    fn clone(&self) -> Self {
35        *self
36    }
37}
38
39impl<J: JobSpec> Copy for JobRef<J> {}
40
41pub trait ServiceSpec {
42    type Config: Serialize + DeserializeOwned + Send + 'static;
43    type Command: Serialize + DeserializeOwned + Send + 'static;
44    type CommandOk: Serialize + DeserializeOwned + Send + 'static;
45    type CommandErr: Serialize + DeserializeOwned + Send + 'static;
46    type Event: Serialize + DeserializeOwned + Send + 'static;
47    type StartErr: Serialize + DeserializeOwned + Send + 'static;
48    const NAME: &'static str;
49}
50
51#[derive(Debug)]
52pub struct ServiceType<S: ServiceSpec> {
53    pub name: &'static str,
54    _marker: PhantomData<fn() -> S>,
55}
56
57impl<S: ServiceSpec> ServiceType<S> {
58    pub const fn new(name: &'static str) -> Self {
59        Self {
60            name,
61            _marker: PhantomData,
62        }
63    }
64}
65
66impl<S: ServiceSpec> Clone for ServiceType<S> {
67    fn clone(&self) -> Self {
68        *self
69    }
70}
71
72impl<S: ServiceSpec> Copy for ServiceType<S> {}
73
74#[derive(Debug)]
75pub struct ServiceSlot<S: ServiceSpec> {
76    pub ty: ServiceType<S>,
77    pub slot_key: Cow<'static, str>,
78}
79
80impl<S: ServiceSpec> ServiceSlot<S> {
81    pub fn singleton(ty: ServiceType<S>) -> Self {
82        Self {
83            ty,
84            slot_key: Cow::Borrowed("singleton"),
85        }
86    }
87
88    pub fn keyed(ty: ServiceType<S>, key: impl Into<String>) -> Self {
89        Self {
90            ty,
91            slot_key: Cow::Owned(key.into()),
92        }
93    }
94
95    pub fn slot_key(&self) -> &str {
96        self.slot_key.as_ref()
97    }
98}
99
100impl<S: ServiceSpec> Clone for ServiceSlot<S> {
101    fn clone(&self) -> Self {
102        Self {
103            ty: self.ty,
104            slot_key: self.slot_key.clone(),
105        }
106    }
107}
108
109#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
110pub struct JobRequestPayload {
111    pub job_name: String,
112    pub payload: Vec<u8>,
113}
114
115#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
116pub struct ServiceStartPayload {
117    pub service_name: String,
118    pub slot_key: String,
119    pub config: Vec<u8>,
120}
121
122#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
123pub struct ServiceCommandPayload {
124    pub service_name: String,
125    pub slot_key: String,
126    pub payload: Vec<u8>,
127}
128
129#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
130pub struct ServiceStopPayload {
131    pub service_name: String,
132    pub slot_key: String,
133}
134
135#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Default)]
136pub struct ServiceBindings {
137    pub on_started: Option<ActionEnvelope>,
138    pub on_start_failed: Option<ActionEnvelope>,
139    pub on_event: Option<ActionEnvelope>,
140    pub on_stopped: Option<ActionEnvelope>,
141    pub on_command_ok: Option<ActionEnvelope>,
142    pub on_command_err: Option<ActionEnvelope>,
143}
144
145#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
146pub struct ResourceExecutionContext {
147    pub key: String,
148    pub generation: u64,
149}
150
151#[derive(Clone, Debug)]
152pub struct JobCtx {
153    pub req_id: u64,
154}
155
156type EmitFn = dyn Fn(Vec<u8>) -> BoxFuture<Result<(), String>> + Send + Sync;
157
158struct ServiceCtxInner {
159    service_name: String,
160    slot_key: String,
161    instance_id: u64,
162    emit: Arc<EmitFn>,
163}
164
165pub struct ServiceCtx<S: ServiceSpec> {
166    inner: Arc<ServiceCtxInner>,
167    _marker: PhantomData<fn() -> S>,
168}
169
170impl<S: ServiceSpec> std::fmt::Debug for ServiceCtx<S> {
171    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172        f.debug_struct("ServiceCtx")
173            .field("service_name", &self.inner.service_name)
174            .field("slot_key", &self.inner.slot_key)
175            .field("instance_id", &self.inner.instance_id)
176            .finish()
177    }
178}
179
180impl<S: ServiceSpec> Clone for ServiceCtx<S> {
181    fn clone(&self) -> Self {
182        Self {
183            inner: self.inner.clone(),
184            _marker: PhantomData,
185        }
186    }
187}
188
189impl<S: ServiceSpec> ServiceCtx<S> {
190    #[doc(hidden)]
191    pub fn new_runtime(
192        service_name: String,
193        slot_key: String,
194        instance_id: u64,
195        emit: Arc<EmitFn>,
196    ) -> Self {
197        Self {
198            inner: Arc::new(ServiceCtxInner {
199                service_name,
200                slot_key,
201                instance_id,
202                emit,
203            }),
204            _marker: PhantomData,
205        }
206    }
207
208    pub fn service_name(&self) -> &str {
209        &self.inner.service_name
210    }
211
212    pub fn slot_key(&self) -> &str {
213        &self.inner.slot_key
214    }
215
216    pub fn instance_id(&self) -> u64 {
217        self.inner.instance_id
218    }
219
220    pub fn emit(&self, event: S::Event) -> BoxFuture<Result<(), String>> {
221        match serde_json::to_vec(&event) {
222            Ok(bytes) => (self.inner.emit)(bytes),
223            Err(err) => Box::pin(async move { Err(err.to_string()) }),
224        }
225    }
226}
227
228pub trait ServiceRunner<S: ServiceSpec>: Send + 'static {
229    fn on_command(
230        &mut self,
231        command: S::Command,
232        ctx: ServiceCtx<S>,
233    ) -> BoxFuture<Result<S::CommandOk, S::CommandErr>>;
234
235    fn on_stop(self: Box<Self>, ctx: ServiceCtx<S>) -> BoxFuture<()>;
236}