use crate::saga_action_error::ActionError;
use crate::saga_action_error::UndoActionError;
use crate::SagaId;
use anyhow::anyhow;
use anyhow::Context;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
use std::collections::BTreeMap;
use std::fmt;
use std::sync::Arc;
use thiserror::Error;
#[derive(
Deserialize,
Clone,
Copy,
Eq,
Ord,
JsonSchema,
PartialEq,
PartialOrd,
Serialize,
)]
#[serde(transparent)]
pub struct SagaNodeId(u32);
NewtypeDebug! { () pub struct SagaNodeId(u32); }
NewtypeDisplay! { () pub struct SagaNodeId(u32); }
NewtypeFrom! { () pub struct SagaNodeId(u32); }
#[derive(Debug, Clone, Error)]
pub enum SagaLogError {
#[error(
"event type {event_type} is illegal with current load status \
{current_status:?}"
)]
IllegalEventForState {
current_status: SagaNodeLoadStatus,
event_type: SagaNodeEventType,
},
}
#[derive(Clone, Deserialize, Serialize)]
pub struct SagaNodeEvent {
pub saga_id: SagaId,
pub node_id: SagaNodeId,
pub event_type: SagaNodeEventType,
}
impl fmt::Debug for SagaNodeEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "N{:0>3} {}", self.node_id, self.event_type)
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum SagaNodeEventType {
Started,
Succeeded(Arc<serde_json::Value>),
Failed(ActionError),
UndoStarted,
UndoFinished,
UndoFailed(UndoActionError),
}
impl fmt::Display for SagaNodeEventType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.label())
}
}
impl SagaNodeEventType {
pub fn label(&self) -> &'static str {
match self {
SagaNodeEventType::Started => "started",
SagaNodeEventType::Succeeded(_) => "succeeded",
SagaNodeEventType::Failed(_) => "failed",
SagaNodeEventType::UndoStarted => "undo_started",
SagaNodeEventType::UndoFinished => "undo_finished",
SagaNodeEventType::UndoFailed(_) => "undo_failed",
}
}
}
#[derive(Clone, Debug)]
pub enum SagaNodeLoadStatus {
NeverStarted,
Started,
Succeeded(Arc<serde_json::Value>),
Failed(ActionError),
UndoStarted(Arc<serde_json::Value>),
UndoFinished,
UndoFailed(UndoActionError),
}
impl SagaNodeLoadStatus {
fn next_status(
&self,
event_type: &SagaNodeEventType,
) -> Result<SagaNodeLoadStatus, SagaLogError> {
match (self, event_type) {
(SagaNodeLoadStatus::NeverStarted, SagaNodeEventType::Started) => {
Ok(SagaNodeLoadStatus::Started)
}
(
SagaNodeLoadStatus::Started,
SagaNodeEventType::Succeeded(out),
) => Ok(SagaNodeLoadStatus::Succeeded(Arc::clone(out))),
(SagaNodeLoadStatus::Started, SagaNodeEventType::Failed(e)) => {
Ok(SagaNodeLoadStatus::Failed(e.clone()))
}
(
SagaNodeLoadStatus::Succeeded(out),
SagaNodeEventType::UndoStarted,
) => Ok(SagaNodeLoadStatus::UndoStarted(Arc::clone(out))),
(
SagaNodeLoadStatus::UndoStarted(_),
SagaNodeEventType::UndoFinished,
) => Ok(SagaNodeLoadStatus::UndoFinished),
(
SagaNodeLoadStatus::UndoStarted(_),
SagaNodeEventType::UndoFailed(e),
) => Ok(SagaNodeLoadStatus::UndoFailed(e.clone())),
_ => Err(SagaLogError::IllegalEventForState {
current_status: self.clone(),
event_type: event_type.clone(),
}),
}
}
}
#[derive(Clone, Debug)]
pub struct SagaLog {
saga_id: SagaId,
unwinding: bool,
events: Vec<SagaNodeEvent>,
node_status: BTreeMap<SagaNodeId, SagaNodeLoadStatus>,
}
impl SagaLog {
pub fn new_empty(saga_id: SagaId) -> SagaLog {
SagaLog {
saga_id,
events: Vec::new(),
node_status: BTreeMap::new(),
unwinding: false,
}
}
pub fn new_recover(
saga_id: SagaId,
mut events: Vec<SagaNodeEvent>,
) -> Result<SagaLog, anyhow::Error> {
let mut log = Self::new_empty(saga_id);
events.sort_by_key(|f| match f.event_type {
SagaNodeEventType::Started => 1,
SagaNodeEventType::Succeeded(_) => 2,
SagaNodeEventType::Failed(_) => 3,
SagaNodeEventType::UndoStarted => 4,
SagaNodeEventType::UndoFinished => 5,
SagaNodeEventType::UndoFailed(_) => 6,
});
for event in events {
if event.saga_id != saga_id {
return Err(anyhow!(
"found an event in the log for a different saga ({}) than \
requested ({})",
event.saga_id,
saga_id,
));
}
log.record(&event).with_context(|| "SagaLog::new_recover")?;
}
Ok(log)
}
pub fn record(
&mut self,
event: &SagaNodeEvent,
) -> Result<(), SagaLogError> {
let current_status = self.load_status_for_node(event.node_id);
let next_status = current_status.next_status(&event.event_type)?;
match next_status {
SagaNodeLoadStatus::Failed(_)
| SagaNodeLoadStatus::UndoStarted(_)
| SagaNodeLoadStatus::UndoFinished => {
self.unwinding = true;
}
_ => (),
};
self.node_status.insert(event.node_id, next_status);
self.events.push(event.clone());
Ok(())
}
pub fn unwinding(&self) -> bool {
self.unwinding
}
pub fn load_status_for_node(
&self,
node_id: SagaNodeId,
) -> &SagaNodeLoadStatus {
self.node_status
.get(&node_id)
.unwrap_or(&SagaNodeLoadStatus::NeverStarted)
}
pub fn events(&self) -> &[SagaNodeEvent] {
&self.events
}
pub fn pretty(&self) -> SagaLogPretty<'_> {
SagaLogPretty { log: self }
}
}
pub struct SagaLogPretty<'a> {
log: &'a SagaLog,
}
impl<'a> fmt::Debug for SagaLogPretty<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SAGA LOG:\n")?;
write!(f, "saga id: {}\n", self.log.saga_id)?;
write!(
f,
"direction: {}\n",
if !self.log.unwinding { "forward" } else { "unwinding" }
)?;
write!(f, "events ({} total):\n", self.log.events.len())?;
write!(f, "\n")?;
for (i, event) in self.log.events.iter().enumerate() {
write!(f, "{:0>3} {:?}\n", i + 1, event)?;
}
Ok(())
}
}