1use super::super::channels::ChannelName;
2use super::super::codec::{DispatchRequest, DispatchResponse};
3use super::super::error::AionSurfaceError;
4use super::super::types::{ActivityRequest, ActivityResult};
5use crate::conversation::ParticipantPid;
6use crate::routing::{ConsumerId, ConsumerStateView};
7
8#[derive(Clone, Debug, PartialEq, Eq)]
10pub struct DispatchWorker {
11 pub worker_id: String,
13 pub participant: ParticipantPid,
15 pub consumer_state: ConsumerStateView,
17}
18
19impl DispatchWorker {
20 #[must_use]
22 pub fn new(worker_id: impl Into<String>, participant: ParticipantPid) -> Self {
23 let worker_id = worker_id.into();
24 let consumer_state =
25 ConsumerStateView::new(ConsumerId::new(worker_id.clone()), 0, 1, 0, Vec::new());
26 Self {
27 worker_id,
28 participant,
29 consumer_state,
30 }
31 }
32
33 #[must_use]
35 pub fn with_consumer_state(
36 worker_id: impl Into<String>,
37 participant: ParticipantPid,
38 consumer_state: ConsumerStateView,
39 ) -> Self {
40 Self {
41 worker_id: worker_id.into(),
42 participant,
43 consumer_state,
44 }
45 }
46}
47
48#[derive(Clone, Debug, PartialEq, Eq)]
50pub enum ActivityDispatchState {
51 ActivityScheduled,
53 ActivityStarted,
55 ActivityCompleted,
57 ActivityFailed { retry_eligible: bool },
59}
60
61#[derive(Clone, Debug, PartialEq, Eq)]
63pub enum DispatchOperationKind {
64 ConversationOpened,
66 WorkerSelected,
68 MessageSent,
70 MessageReceived,
72 WorkerExited,
74 ConversationClosed,
76}
77
78#[derive(Clone, Debug, PartialEq, Eq)]
80pub struct DispatchOperation {
81 pub kind: DispatchOperationKind,
83 pub conversation_id: String,
85 pub channel_name: String,
87 pub worker_id: Option<String>,
89 pub activity_state: Option<ActivityDispatchState>,
91 pub result: Option<ActivityResult>,
93 pub message: Option<String>,
95}
96
97impl DispatchOperation {
98 pub(crate) fn new(
99 kind: DispatchOperationKind,
100 conversation_id: &str,
101 channel_name: &ChannelName,
102 ) -> Self {
103 Self {
104 kind,
105 conversation_id: conversation_id.to_owned(),
106 channel_name: String::from(channel_name.clone()),
107 worker_id: None,
108 activity_state: None,
109 result: None,
110 message: None,
111 }
112 }
113
114 pub(crate) fn worker(mut self, worker_id: impl Into<String>) -> Self {
115 self.worker_id = Some(worker_id.into());
116 self
117 }
118
119 pub(crate) const fn state(mut self, state: ActivityDispatchState) -> Self {
120 self.activity_state = Some(state);
121 self
122 }
123
124 pub(crate) fn result(mut self, result: ActivityResult) -> Self {
125 self.result = Some(result);
126 self
127 }
128
129 pub(crate) fn message(mut self, message: impl Into<String>) -> Self {
130 self.message = Some(message.into());
131 self
132 }
133}
134
135#[derive(Clone, Debug, PartialEq, Eq)]
137pub struct RecordedDispatchOutcome {
138 pub result: Result<ActivityResult, AionSurfaceError>,
140}
141
142impl RecordedDispatchOutcome {
143 #[must_use]
145 pub const fn new(result: Result<ActivityResult, AionSurfaceError>) -> Self {
146 Self { result }
147 }
148
149 pub(crate) fn into_result(self) -> Result<ActivityResult, AionSurfaceError> {
150 self.result
151 }
152}
153
154pub trait DispatchRecorder: std::fmt::Debug + Send + Sync {
156 fn replay_outcome(
162 &self,
163 channel_name: &str,
164 request: &ActivityRequest,
165 ) -> Result<Option<RecordedDispatchOutcome>, AionSurfaceError>;
166
167 fn record(&self, operation: DispatchOperation) -> Result<(), AionSurfaceError>;
173}
174
175pub trait DispatchWorkerPool: std::fmt::Debug + Send + Sync {
177 fn workers_for(
183 &self,
184 channel_name: &ChannelName,
185 request: &ActivityRequest,
186 ) -> Result<Vec<DispatchWorker>, AionSurfaceError>;
187}
188
189pub trait DispatchRouter: std::fmt::Debug + Send + Sync {
191 fn select_worker(
197 &self,
198 workflow_id: &str,
199 channel_name: &ChannelName,
200 request: &ActivityRequest,
201 candidates: &[DispatchWorker],
202 excluded_worker_ids: &[String],
203 ) -> Result<Option<DispatchWorker>, AionSurfaceError>;
204}
205
206#[derive(Clone, Debug, PartialEq, Eq)]
208pub enum DispatchConversationEvent {
209 Response(DispatchResponse),
211 WorkerExited { worker_id: String, message: String },
213}
214
215pub trait DispatchConversation: std::fmt::Debug + Send {
217 fn link_worker(&mut self, worker: &DispatchWorker) -> Result<(), AionSurfaceError>;
223
224 fn send(&mut self, request: DispatchRequest) -> Result<(), AionSurfaceError>;
230
231 fn receive(&mut self) -> Result<DispatchConversationEvent, AionSurfaceError>;
237
238 fn close(&mut self) -> Result<(), AionSurfaceError>;
244}
245
246pub trait DispatchConversationFactory: std::fmt::Debug + Send + Sync {
248 fn open(
254 &self,
255 workflow_id: &str,
256 channel_name: &ChannelName,
257 conversation_id: &str,
258 ) -> Result<Box<dyn DispatchConversation>, AionSurfaceError>;
259}
260
261pub trait ConversationIdProvider: std::fmt::Debug + Send + Sync {
263 fn next_conversation_id(&self) -> String;
265}