gadget_common/
protocol.rs

1use crate::environments::GadgetEnvironment;
2use crate::module::Job;
3use async_trait::async_trait;
4use gadget_core::job::{BuiltExecutableJobWrapper, JobError};
5use gadget_core::job_manager::{ProtocolRemote, ShutdownReason, WorkManagerInterface};
6use gadget_io::tokio::sync::mpsc::UnboundedReceiver;
7use parking_lot::Mutex;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Arc;
10
11pub struct AsyncProtocolRemote<Env: GadgetEnvironment> {
12    pub start_tx: Mutex<Option<gadget_io::tokio::sync::oneshot::Sender<()>>>,
13    pub shutdown_tx: Mutex<Option<gadget_io::tokio::sync::oneshot::Sender<ShutdownReason>>>,
14    pub associated_session_id: <Env::WorkManager as WorkManagerInterface>::SessionID,
15    pub associated_block_id: <Env::WorkManager as WorkManagerInterface>::Clock,
16    pub associated_retry_id: <Env::WorkManager as WorkManagerInterface>::RetryID,
17    pub associated_task_id: <Env::WorkManager as WorkManagerInterface>::TaskID,
18    pub to_async_protocol:
19        gadget_io::tokio::sync::mpsc::UnboundedSender<<Env as GadgetEnvironment>::ProtocolMessage>,
20    pub is_done: Arc<AtomicBool>,
21}
22
23#[async_trait]
24pub trait AsyncProtocol<Env: GadgetEnvironment> {
25    type AdditionalParams: Send + Sync + 'static;
26    async fn generate_protocol_from(
27        &self,
28        associated_block_id: <Env::WorkManager as WorkManagerInterface>::Clock,
29        associated_retry_id: <Env::WorkManager as WorkManagerInterface>::RetryID,
30        associated_session_id: <Env::WorkManager as WorkManagerInterface>::SessionID,
31        associated_task_id: <Env::WorkManager as WorkManagerInterface>::TaskID,
32        protocol_message_rx: UnboundedReceiver<Env::ProtocolMessage>,
33        additional_params: Self::AdditionalParams,
34    ) -> Result<BuiltExecutableJobWrapper, JobError>;
35
36    async fn create(
37        &self,
38        session_id: <Env::WorkManager as WorkManagerInterface>::SessionID,
39        now: <Env::WorkManager as WorkManagerInterface>::Clock,
40        retry_id: <Env::WorkManager as WorkManagerInterface>::RetryID,
41        task_id: <Env::WorkManager as WorkManagerInterface>::TaskID,
42        additional_params: Self::AdditionalParams,
43    ) -> Result<Job<Env>, JobError> {
44        let is_done = Arc::new(AtomicBool::new(false));
45        let (to_async_protocol, protocol_message_rx) =
46            gadget_io::tokio::sync::mpsc::unbounded_channel();
47        let (start_tx, start_rx) = gadget_io::tokio::sync::oneshot::channel();
48        let (shutdown_tx, shutdown_rx) = gadget_io::tokio::sync::oneshot::channel();
49        let async_protocol = self
50            .generate_protocol_from(
51                now,
52                retry_id,
53                session_id,
54                task_id,
55                protocol_message_rx,
56                additional_params,
57            )
58            .await?;
59
60        let remote = AsyncProtocolRemote {
61            start_tx: Mutex::new(Some(start_tx)),
62            shutdown_tx: Mutex::new(Some(shutdown_tx)),
63            associated_block_id: now,
64            associated_retry_id: retry_id,
65            associated_task_id: task_id,
66            associated_session_id: session_id,
67            to_async_protocol,
68            is_done: is_done.clone(),
69        };
70
71        let job_manager_compatible_protocol = crate::helpers::create_job_manager_compatible_job(
72            is_done,
73            start_rx,
74            shutdown_rx,
75            async_protocol,
76        );
77
78        Ok((remote, job_manager_compatible_protocol))
79    }
80}
81
82impl<Env: GadgetEnvironment> ProtocolRemote<<Env as GadgetEnvironment>::WorkManager>
83    for AsyncProtocolRemote<Env>
84{
85    fn start(&self) -> Result<(), Env::Error> {
86        self.start_tx
87            .lock()
88            .take()
89            .ok_or_else(|| Env::Error::from("Protocol already started".to_string()))?
90            .send(())
91            .map_err(|_err| Env::Error::from("Unable to start protocol".to_string()))
92    }
93
94    fn session_id(&self) -> <Env::WorkManager as WorkManagerInterface>::SessionID {
95        self.associated_session_id
96    }
97
98    fn started_at(&self) -> <Env::WorkManager as WorkManagerInterface>::Clock {
99        self.associated_block_id
100    }
101
102    fn shutdown(&self, reason: ShutdownReason) -> Result<(), Env::Error> {
103        self.shutdown_tx
104            .lock()
105            .take()
106            .ok_or_else(|| Env::Error::from("Protocol already shutdown".to_string()))?
107            .send(reason)
108            .map_err(|reason| {
109                Env::Error::from(format!(
110                    "Unable to shutdown protocol with status {reason:?}"
111                ))
112            })
113    }
114
115    fn is_done(&self) -> bool {
116        self.is_done.load(Ordering::SeqCst)
117    }
118
119    fn deliver_message(&self, message: Env::ProtocolMessage) -> Result<(), Env::Error> {
120        self.to_async_protocol
121            .send(message)
122            .map_err(|err| Env::Error::from(err.to_string()))
123    }
124
125    fn has_started(&self) -> bool {
126        self.start_tx.lock().is_none()
127    }
128
129    fn retry_id(&self) -> <Env::WorkManager as WorkManagerInterface>::RetryID {
130        self.associated_retry_id
131    }
132}