use crate::ActionContext;
use crate::ActionError;
use crate::ActionFuncResult;
use crate::ActionRegistry;
use crate::Dag;
use crate::DagBuilder;
use crate::Node;
use crate::SagaDag;
use crate::SagaName;
use crate::SagaType;
use serde::Deserialize;
use serde::Serialize;
use std::sync::Arc;
use thiserror::Error;
#[doc(hidden)]
#[derive(Debug)]
pub struct ExampleSagaType {}
impl SagaType for ExampleSagaType {
type ExecContextType = ExampleContext;
}
#[doc(hidden)]
#[derive(Debug, Deserialize, Serialize)]
pub struct ExampleParams {
pub instance_name: String,
pub number_of_instances: u16,
}
#[doc(hidden)]
#[derive(Debug, Default)]
pub struct ExampleContext;
type SagaExampleContext = ActionContext<ExampleSagaType>;
#[derive(Debug, Deserialize, Error, Serialize)]
enum ExampleError {
#[error("example error")]
AnError,
}
type ExFuncResult<T> = ActionFuncResult<T, ActionError>;
#[derive(Debug)]
struct ExampleSubsagaType {}
impl SagaType for ExampleSubsagaType {
type ExecContextType = ExampleContext;
}
#[derive(Debug, Deserialize, Serialize)]
struct ExampleSubsagaParams {
number_of_things: u32,
}
#[derive(Debug, Deserialize, Serialize)]
struct ServerAllocResult {
server_id: u64,
}
impl From<ExampleError> for ActionError {
fn from(t: ExampleError) -> ActionError {
ActionError::action_failed(t)
}
}
mod actions {
use super::ExampleSagaType;
use crate::new_action_noop_undo;
use crate::Action;
use lazy_static::lazy_static;
use std::sync::Arc;
lazy_static! {
pub static ref INSTANCE_CREATE: Arc<dyn Action<ExampleSagaType>> =
new_action_noop_undo(
"instance_create",
super::demo_prov_instance_create,
);
pub static ref VPC_ALLOC_IP: Arc<dyn Action<ExampleSagaType>> =
new_action_noop_undo("vpc_alloc_ip", super::demo_prov_vpc_alloc_ip);
pub static ref VOLUME_CREATE: Arc<dyn Action<ExampleSagaType>> =
new_action_noop_undo(
"volume_create",
super::demo_prov_volume_create,
);
pub static ref INSTANCE_CONFIGURE: Arc<dyn Action<ExampleSagaType>> =
new_action_noop_undo(
"instance_configure",
super::demo_prov_instance_configure,
);
pub static ref VOLUME_ATTACH: Arc<dyn Action<ExampleSagaType>> =
new_action_noop_undo(
"volume_attach",
super::demo_prov_volume_attach,
);
pub static ref INSTANCE_BOOT: Arc<dyn Action<ExampleSagaType>> =
new_action_noop_undo(
"instance_boot",
super::demo_prov_instance_boot,
);
pub static ref PRINT: Arc<dyn Action<ExampleSagaType>> =
new_action_noop_undo("print", super::demo_prov_print);
pub static ref SERVER_PICK: Arc<dyn Action<ExampleSagaType>> =
new_action_noop_undo("server_pick", super::demo_prov_server_pick);
pub static ref SERVER_RESERVE: Arc<dyn Action<ExampleSagaType>> =
new_action_noop_undo(
"server_reserve",
super::demo_prov_server_reserve,
);
}
}
#[doc(hidden)]
pub fn load_example_actions(registry: &mut ActionRegistry<ExampleSagaType>) {
registry.register(actions::INSTANCE_CREATE.clone());
registry.register(actions::VPC_ALLOC_IP.clone());
registry.register(actions::VOLUME_CREATE.clone());
registry.register(actions::INSTANCE_CONFIGURE.clone());
registry.register(actions::VOLUME_ATTACH.clone());
registry.register(actions::INSTANCE_BOOT.clone());
registry.register(actions::PRINT.clone());
registry.register(actions::SERVER_PICK.clone());
registry.register(actions::SERVER_RESERVE.clone());
}
fn server_alloc_subsaga() -> Dag {
let name = SagaName::new("server-alloc");
let mut d = DagBuilder::new(name);
d.append(Node::action(
"server_id",
"ServerPick",
actions::SERVER_PICK.as_ref(),
));
d.append(Node::action(
"server_reserve",
"ServerReserve",
actions::SERVER_RESERVE.as_ref(),
));
d.build().unwrap()
}
#[doc(hidden)]
pub fn make_example_provision_dag(params: ExampleParams) -> Arc<SagaDag> {
let name = SagaName::new("DemoVmProvision");
let mut d = DagBuilder::new(name);
d.append(Node::action(
"instance_id",
"InstanceCreate",
actions::INSTANCE_CREATE.as_ref(),
));
let subsaga_params = ExampleSubsagaParams { number_of_things: 1 };
d.append(Node::constant(
"server_alloc_params",
serde_json::to_value(subsaga_params).unwrap(),
));
d.append_parallel(vec![
Node::action(
"instance_ip",
"VpcAllocIp",
actions::VPC_ALLOC_IP.as_ref(),
),
Node::action(
"volume_id",
"VolumeCreate",
actions::VOLUME_CREATE.as_ref(),
),
Node::subsaga(
"server_alloc",
server_alloc_subsaga(),
"server_alloc_params",
),
]);
d.append(Node::action(
"instance_configure",
"InstanceConfigure",
actions::INSTANCE_CONFIGURE.as_ref(),
));
d.append(Node::action(
"volume_attach",
"VolumeAttach",
actions::VOLUME_ATTACH.as_ref(),
));
d.append(Node::action(
"instance_boot",
"InstanceBoot",
actions::INSTANCE_BOOT.as_ref(),
));
d.append(Node::action("print", "Print", actions::PRINT.as_ref()));
Arc::new(SagaDag::new(
d.build().unwrap(),
serde_json::to_value(params).unwrap(),
))
}
async fn demo_prov_instance_create(
sgctx: SagaExampleContext,
) -> ExFuncResult<u64> {
let params = sgctx.saga_params::<ExampleParams>()?;
eprintln!(
"running action: {} (instance name: {})",
sgctx.node_label(),
params.instance_name
);
let instance_id = 1211u64;
Ok(instance_id)
}
async fn demo_prov_vpc_alloc_ip(
sgctx: SagaExampleContext,
) -> ExFuncResult<String> {
eprintln!("running action: {}", sgctx.node_label());
let instance_id = sgctx.lookup::<u64>("instance_id")?;
assert_eq!(instance_id, 1211);
let ip = String::from("10.120.121.122");
Ok(ip)
}
async fn demo_prov_server_pick(sgctx: SagaExampleContext) -> ExFuncResult<u64> {
eprintln!("running action: {}", sgctx.node_label());
let params = sgctx.saga_params::<ExampleSubsagaParams>()?;
assert_eq!(params.number_of_things, 1);
let server_id = 1212u64;
Ok(server_id)
}
async fn demo_prov_server_reserve(
sgctx: SagaExampleContext,
) -> ExFuncResult<ServerAllocResult> {
eprintln!("running action: {}", sgctx.node_label());
let params = sgctx.saga_params::<ExampleSubsagaParams>()?;
assert_eq!(params.number_of_things, 1);
let server_id = sgctx.lookup::<u64>("server_id")?;
assert_eq!(server_id, 1212);
Ok(ServerAllocResult { server_id })
}
async fn demo_prov_volume_create(
sgctx: SagaExampleContext,
) -> ExFuncResult<u64> {
eprintln!("running action: {}", sgctx.node_label());
assert_eq!(sgctx.lookup::<u64>("instance_id")?, 1211);
let volume_id = 1213u64;
Ok(volume_id)
}
async fn demo_prov_instance_configure(
sgctx: SagaExampleContext,
) -> ExFuncResult<()> {
eprintln!("running action: {}", sgctx.node_label());
assert_eq!(sgctx.lookup::<u64>("instance_id")?, 1211);
let params = sgctx.saga_params::<ExampleParams>()?;
assert_eq!(params.number_of_instances, 1);
assert_eq!(
sgctx.lookup::<ServerAllocResult>("server_alloc")?.server_id,
1212
);
assert_eq!(sgctx.lookup::<u64>("volume_id")?, 1213);
Ok(())
}
async fn demo_prov_volume_attach(
sgctx: SagaExampleContext,
) -> ExFuncResult<()> {
eprintln!("running action: {}", sgctx.node_label());
assert_eq!(sgctx.lookup::<u64>("instance_id")?, 1211);
assert_eq!(sgctx.lookup::<u64>("volume_id")?, 1213);
assert_eq!(
sgctx.lookup::<ServerAllocResult>("server_alloc")?.server_id,
1212
);
Ok(())
}
async fn demo_prov_instance_boot(
sgctx: SagaExampleContext,
) -> ExFuncResult<()> {
eprintln!("running action: {}", sgctx.node_label());
assert_eq!(sgctx.lookup::<u64>("instance_id")?, 1211);
assert_eq!(sgctx.lookup::<u64>("volume_id")?, 1213);
assert_eq!(
sgctx.lookup::<ServerAllocResult>("server_alloc")?.server_id,
1212
);
Ok(())
}
async fn demo_prov_print(sgctx: SagaExampleContext) -> ExFuncResult<String> {
eprintln!("running action: {}", sgctx.node_label());
eprintln!("printing final state:");
let vm_instance_id = sgctx.lookup::<u64>("instance_id")?;
eprintln!(" instance id: {}", vm_instance_id);
let ip = sgctx.lookup::<String>("instance_ip")?;
eprintln!(" IP address: {}", ip);
let volume_id = sgctx.lookup::<u64>("volume_id")?;
eprintln!(" volume id: {}", volume_id);
let server_id =
sgctx.lookup::<ServerAllocResult>("server_alloc")?.server_id;
eprintln!(" server id: {}", server_id);
Ok(String::from("it worked"))
}