Skip to main content

rio_rs/
service_object.rs

1//! Module for implementing a Rio service
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use serde::{de::DeserializeOwned, Deserialize, Serialize};
7
8use crate::app_data::AppData;
9use crate::errors::ServiceObjectLifeCycleError;
10use crate::protocol::{ClientError, RequestEnvelope, RequestError};
11use crate::registry::{Handler, IdentifiableType, Message};
12use crate::server::{AdminCommands, AdminSender, InternalClientSender, SendCommand};
13
14/// Internal representation of an object id.
15///
16/// It is stuct name + the object id (as in [WithId]).
17/// It is used lookups across tthis project
18#[derive(Debug)]
19pub struct ObjectId(pub String, pub String);
20
21impl ObjectId {
22    pub fn new(struct_name: impl Into<String>, object_id: impl Into<String>) -> ObjectId {
23        ObjectId(struct_name.into(), object_id.into())
24    }
25}
26
27/// Common interface to get a string Id for an object
28///
29/// This is particularly useful for the registry, as every object
30/// in the registry needs to have an Id for retrieval
31// TODO move it out of the service_object module
32pub trait WithId {
33    fn set_id(&mut self, id: String);
34    fn id(&self) -> &str;
35}
36
37/// ServiceObjects are the objects that will respond to various types of messages through
38/// the Rio Server
39///
40/// The server stores each ServiceObject onto a registry and control their life cycle
41///
42/// There are a few requirements in oder to implement a ServiceObject:
43///     - Default
44///     - WithId
45///     - IdentifiableType
46///     - ObjectStateManager
47///     - ServiceObjectStateLoad
48#[async_trait]
49pub trait ServiceObject: Default + WithId + IdentifiableType {
50    /// Send a message to Rio cluster using a client tht is stored in AppData
51    async fn send<T, V, E>(
52        app_data: &AppData,
53        handler_type_id: impl ToString + Send + Sync,
54        handler_id: impl ToString + Send + Sync,
55        payload: &V,
56    ) -> Result<T, RequestError<E>>
57    where
58        E: std::error::Error + DeserializeOwned + Clone + Send + Sync,
59        T: DeserializeOwned + Send + Sync,
60        V: Serialize + IdentifiableType + Send + Sync,
61    {
62        let client = app_data.get::<InternalClientSender>();
63        let payload = bincode::serialize(&payload).expect("TODO");
64        let request = RequestEnvelope::new(
65            handler_type_id.to_string(),
66            handler_id.to_string(),
67            V::user_defined_type_id().to_string(),
68            payload,
69        );
70        let (request_message, channel) = SendCommand::build(request);
71        client
72            .send(request_message)
73            .map_err(|e| ClientError::IoError(e.to_string()))?;
74
75        let resp = channel
76            .await
77            .map_err(|e| ClientError::IoError(e.to_string()))??;
78
79        let parsed_body = bincode::deserialize::<T>(&resp)
80            .map_err(|e| ClientError::DeseralizationError(e.to_string()))?;
81        Ok(parsed_body)
82    }
83
84    async fn before_load(&mut self, _: Arc<AppData>) -> Result<(), ServiceObjectLifeCycleError> {
85        Ok(())
86    }
87
88    async fn after_load(&mut self, _: Arc<AppData>) -> Result<(), ServiceObjectLifeCycleError> {
89        Ok(())
90    }
91
92    async fn before_shutdown(
93        &mut self,
94        _: Arc<AppData>,
95    ) -> Result<(), ServiceObjectLifeCycleError> {
96        Ok(())
97    }
98
99    async fn shutdown(
100        &mut self,
101        app_data: Arc<AppData>,
102    ) -> Result<(), ServiceObjectLifeCycleError> {
103        self.before_shutdown(app_data.clone()).await?;
104        let admin_sender = app_data.get::<AdminSender>().clone();
105        admin_sender
106            .send(AdminCommands::Shutdown(
107                Self::user_defined_type_id().to_string(),
108                self.id().to_string(),
109            ))
110            .expect("TODO");
111        Ok(())
112    }
113}
114
115/// Load all states for a into a ServiceObject
116#[async_trait]
117pub trait ServiceObjectStateLoad {
118    async fn load(&mut self, _: &AppData) -> Result<(), ServiceObjectLifeCycleError> {
119        Ok(())
120    }
121}
122
123/// [Message] that is sent to the object when it reaches specific
124/// parts of its life cycle
125#[derive(Debug, Serialize, Deserialize)]
126pub enum LifecycleMessage {
127    Load,
128}
129
130impl Message for LifecycleMessage {}
131
132impl IdentifiableType for LifecycleMessage {
133    fn user_defined_type_id() -> &'static str {
134        "LifecycleMessage"
135    }
136}
137
138#[async_trait]
139impl<T> Handler<LifecycleMessage> for T
140where
141    T: ServiceObject + ServiceObjectStateLoad + Send + Sync,
142{
143    type Returns = ();
144    type Error = ServiceObjectLifeCycleError;
145
146    async fn handle(
147        &mut self,
148        message: LifecycleMessage,
149        context: Arc<AppData>,
150    ) -> Result<Self::Returns, Self::Error> {
151        match message {
152            LifecycleMessage::Load => {
153                self.before_load(context.clone()).await?;
154                self.load(context.clone().as_ref()).await?;
155                self.after_load(context.clone()).await?;
156                Ok(())
157            }
158        }
159    }
160}
161
162#[cfg(test)]
163mod test {
164    use crate::protocol::NoopError;
165
166    use super::*;
167
168    #[tokio::test]
169    async fn asd() {
170        #[derive(Default, Serialize, Deserialize)]
171        struct DummyMessage {}
172
173        impl IdentifiableType for DummyMessage {
174            fn user_defined_type_id() -> &'static str {
175                "DummyMessage"
176            }
177        }
178
179        #[derive(Default)]
180        struct DummyService {}
181
182        impl ServiceObject for DummyService {}
183
184        impl WithId for DummyService {
185            fn id(&self) -> &str {
186                ""
187            }
188            fn set_id(&mut self, _: String) {}
189        }
190
191        impl IdentifiableType for DummyService {
192            fn user_defined_type_id() -> &'static str {
193                "DummyService"
194            }
195        }
196
197        impl DummyService {
198            #[allow(unused)]
199            async fn test(&self, app_data: &AppData, handler_type_id: String, handler_id: String) {
200                let payload = DummyMessage::default();
201                let _r: Result<DummyMessage, RequestError<NoopError>> =
202                    Self::send(app_data, handler_type_id, handler_id, &payload).await;
203            }
204        }
205    }
206}