use super::super::channels::ChannelName;
use super::super::codec::{DispatchRequest, DispatchResponse};
use super::super::error::AionSurfaceError;
use super::super::types::{ActivityRequest, ActivityResult};
use crate::conversation::ParticipantPid;
use crate::routing::{ConsumerId, ConsumerStateView};
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct DispatchWorker {
pub worker_id: String,
pub participant: ParticipantPid,
pub consumer_state: ConsumerStateView,
}
impl DispatchWorker {
#[must_use]
pub fn new(worker_id: impl Into<String>, participant: ParticipantPid) -> Self {
let worker_id = worker_id.into();
let consumer_state =
ConsumerStateView::new(ConsumerId::new(worker_id.clone()), 0, 1, 0, Vec::new());
Self {
worker_id,
participant,
consumer_state,
}
}
#[must_use]
pub fn with_consumer_state(
worker_id: impl Into<String>,
participant: ParticipantPid,
consumer_state: ConsumerStateView,
) -> Self {
Self {
worker_id: worker_id.into(),
participant,
consumer_state,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ActivityDispatchState {
ActivityScheduled,
ActivityStarted,
ActivityCompleted,
ActivityFailed { retry_eligible: bool },
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum DispatchOperationKind {
ConversationOpened,
WorkerSelected,
MessageSent,
MessageReceived,
WorkerExited,
ConversationClosed,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct DispatchOperation {
pub kind: DispatchOperationKind,
pub conversation_id: String,
pub channel_name: String,
pub worker_id: Option<String>,
pub activity_state: Option<ActivityDispatchState>,
pub result: Option<ActivityResult>,
pub message: Option<String>,
}
impl DispatchOperation {
pub(crate) fn new(
kind: DispatchOperationKind,
conversation_id: &str,
channel_name: &ChannelName,
) -> Self {
Self {
kind,
conversation_id: conversation_id.to_owned(),
channel_name: String::from(channel_name.clone()),
worker_id: None,
activity_state: None,
result: None,
message: None,
}
}
pub(crate) fn worker(mut self, worker_id: impl Into<String>) -> Self {
self.worker_id = Some(worker_id.into());
self
}
pub(crate) const fn state(mut self, state: ActivityDispatchState) -> Self {
self.activity_state = Some(state);
self
}
pub(crate) fn result(mut self, result: ActivityResult) -> Self {
self.result = Some(result);
self
}
pub(crate) fn message(mut self, message: impl Into<String>) -> Self {
self.message = Some(message.into());
self
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RecordedDispatchOutcome {
pub result: Result<ActivityResult, AionSurfaceError>,
}
impl RecordedDispatchOutcome {
#[must_use]
pub const fn new(result: Result<ActivityResult, AionSurfaceError>) -> Self {
Self { result }
}
pub(crate) fn into_result(self) -> Result<ActivityResult, AionSurfaceError> {
self.result
}
}
pub trait DispatchRecorder: std::fmt::Debug + Send + Sync {
fn replay_outcome(
&self,
channel_name: &str,
request: &ActivityRequest,
) -> Result<Option<RecordedDispatchOutcome>, AionSurfaceError>;
fn record(&self, operation: DispatchOperation) -> Result<(), AionSurfaceError>;
}
pub trait DispatchWorkerPool: std::fmt::Debug + Send + Sync {
fn workers_for(
&self,
channel_name: &ChannelName,
request: &ActivityRequest,
) -> Result<Vec<DispatchWorker>, AionSurfaceError>;
}
pub trait DispatchRouter: std::fmt::Debug + Send + Sync {
fn select_worker(
&self,
workflow_id: &str,
channel_name: &ChannelName,
request: &ActivityRequest,
candidates: &[DispatchWorker],
excluded_worker_ids: &[String],
) -> Result<Option<DispatchWorker>, AionSurfaceError>;
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum DispatchConversationEvent {
Response(DispatchResponse),
WorkerExited { worker_id: String, message: String },
}
pub trait DispatchConversation: std::fmt::Debug + Send {
fn link_worker(&mut self, worker: &DispatchWorker) -> Result<(), AionSurfaceError>;
fn send(&mut self, request: DispatchRequest) -> Result<(), AionSurfaceError>;
fn receive(&mut self) -> Result<DispatchConversationEvent, AionSurfaceError>;
fn close(&mut self) -> Result<(), AionSurfaceError>;
}
pub trait DispatchConversationFactory: std::fmt::Debug + Send + Sync {
fn open(
&self,
workflow_id: &str,
channel_name: &ChannelName,
conversation_id: &str,
) -> Result<Box<dyn DispatchConversation>, AionSurfaceError>;
}
pub trait ConversationIdProvider: std::fmt::Debug + Send + Sync {
fn next_conversation_id(&self) -> String;
}