gadget_common/
protocol.rs1use crate::environments::GadgetEnvironment;
2use crate::module::Job;
3use async_trait::async_trait;
4use gadget_core::job::{BuiltExecutableJobWrapper, JobError};
5use gadget_core::job_manager::{ProtocolRemote, ShutdownReason, WorkManagerInterface};
6use gadget_io::tokio::sync::mpsc::UnboundedReceiver;
7use parking_lot::Mutex;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Arc;
10
11pub struct AsyncProtocolRemote<Env: GadgetEnvironment> {
12 pub start_tx: Mutex<Option<gadget_io::tokio::sync::oneshot::Sender<()>>>,
13 pub shutdown_tx: Mutex<Option<gadget_io::tokio::sync::oneshot::Sender<ShutdownReason>>>,
14 pub associated_session_id: <Env::WorkManager as WorkManagerInterface>::SessionID,
15 pub associated_block_id: <Env::WorkManager as WorkManagerInterface>::Clock,
16 pub associated_retry_id: <Env::WorkManager as WorkManagerInterface>::RetryID,
17 pub associated_task_id: <Env::WorkManager as WorkManagerInterface>::TaskID,
18 pub to_async_protocol:
19 gadget_io::tokio::sync::mpsc::UnboundedSender<<Env as GadgetEnvironment>::ProtocolMessage>,
20 pub is_done: Arc<AtomicBool>,
21}
22
23#[async_trait]
24pub trait AsyncProtocol<Env: GadgetEnvironment> {
25 type AdditionalParams: Send + Sync + 'static;
26 async fn generate_protocol_from(
27 &self,
28 associated_block_id: <Env::WorkManager as WorkManagerInterface>::Clock,
29 associated_retry_id: <Env::WorkManager as WorkManagerInterface>::RetryID,
30 associated_session_id: <Env::WorkManager as WorkManagerInterface>::SessionID,
31 associated_task_id: <Env::WorkManager as WorkManagerInterface>::TaskID,
32 protocol_message_rx: UnboundedReceiver<Env::ProtocolMessage>,
33 additional_params: Self::AdditionalParams,
34 ) -> Result<BuiltExecutableJobWrapper, JobError>;
35
36 async fn create(
37 &self,
38 session_id: <Env::WorkManager as WorkManagerInterface>::SessionID,
39 now: <Env::WorkManager as WorkManagerInterface>::Clock,
40 retry_id: <Env::WorkManager as WorkManagerInterface>::RetryID,
41 task_id: <Env::WorkManager as WorkManagerInterface>::TaskID,
42 additional_params: Self::AdditionalParams,
43 ) -> Result<Job<Env>, JobError> {
44 let is_done = Arc::new(AtomicBool::new(false));
45 let (to_async_protocol, protocol_message_rx) =
46 gadget_io::tokio::sync::mpsc::unbounded_channel();
47 let (start_tx, start_rx) = gadget_io::tokio::sync::oneshot::channel();
48 let (shutdown_tx, shutdown_rx) = gadget_io::tokio::sync::oneshot::channel();
49 let async_protocol = self
50 .generate_protocol_from(
51 now,
52 retry_id,
53 session_id,
54 task_id,
55 protocol_message_rx,
56 additional_params,
57 )
58 .await?;
59
60 let remote = AsyncProtocolRemote {
61 start_tx: Mutex::new(Some(start_tx)),
62 shutdown_tx: Mutex::new(Some(shutdown_tx)),
63 associated_block_id: now,
64 associated_retry_id: retry_id,
65 associated_task_id: task_id,
66 associated_session_id: session_id,
67 to_async_protocol,
68 is_done: is_done.clone(),
69 };
70
71 let job_manager_compatible_protocol = crate::helpers::create_job_manager_compatible_job(
72 is_done,
73 start_rx,
74 shutdown_rx,
75 async_protocol,
76 );
77
78 Ok((remote, job_manager_compatible_protocol))
79 }
80}
81
82impl<Env: GadgetEnvironment> ProtocolRemote<<Env as GadgetEnvironment>::WorkManager>
83 for AsyncProtocolRemote<Env>
84{
85 fn start(&self) -> Result<(), Env::Error> {
86 self.start_tx
87 .lock()
88 .take()
89 .ok_or_else(|| Env::Error::from("Protocol already started".to_string()))?
90 .send(())
91 .map_err(|_err| Env::Error::from("Unable to start protocol".to_string()))
92 }
93
94 fn session_id(&self) -> <Env::WorkManager as WorkManagerInterface>::SessionID {
95 self.associated_session_id
96 }
97
98 fn started_at(&self) -> <Env::WorkManager as WorkManagerInterface>::Clock {
99 self.associated_block_id
100 }
101
102 fn shutdown(&self, reason: ShutdownReason) -> Result<(), Env::Error> {
103 self.shutdown_tx
104 .lock()
105 .take()
106 .ok_or_else(|| Env::Error::from("Protocol already shutdown".to_string()))?
107 .send(reason)
108 .map_err(|reason| {
109 Env::Error::from(format!(
110 "Unable to shutdown protocol with status {reason:?}"
111 ))
112 })
113 }
114
115 fn is_done(&self) -> bool {
116 self.is_done.load(Ordering::SeqCst)
117 }
118
119 fn deliver_message(&self, message: Env::ProtocolMessage) -> Result<(), Env::Error> {
120 self.to_async_protocol
121 .send(message)
122 .map_err(|err| Env::Error::from(err.to_string()))
123 }
124
125 fn has_started(&self) -> bool {
126 self.start_tx.lock().is_none()
127 }
128
129 fn retry_id(&self) -> <Env::WorkManager as WorkManagerInterface>::RetryID {
130 self.associated_retry_id
131 }
132}