pub trait EffectHandler: Send + Sync {
fn handler_identity(&self) -> String {
crate::session::DEFAULT_HANDLER_ID.to_string()
}
#[allow(clippy::too_many_lines)]
fn handle_effect(&self, request: EffectRequest) -> EffectOutcome {
if let Err(failure) = request.metadata.validate() {
return EffectOutcome::failure(failure);
}
match request.body {
EffectRequestBody::SendDecision {
role,
partner,
label,
state,
payload,
} => {
let Some(sid) = request.session else {
return EffectOutcome::failure(EffectFailure::contract_violation(
"send_decision request is missing session",
));
};
match self.send_decision(SendDecisionInput {
sid,
role: &role,
partner: &partner,
label: &label,
state: &state,
payload,
}) {
EffectResult::Success(decision) => {
EffectOutcome::success(EffectResponse::SendDecision { decision })
}
EffectResult::Blocked => EffectOutcome::blocked(),
EffectResult::Failure(failure) => EffectOutcome::failure(failure),
}
}
EffectRequestBody::Receive {
role,
partner,
label,
state,
payload,
} => {
let mut state = state;
match self.handle_recv(&role, &partner, &label, &mut state, &payload) {
EffectResult::Success(()) => {
EffectOutcome::success(EffectResponse::Receive { state })
}
EffectResult::Blocked => EffectOutcome::blocked(),
EffectResult::Failure(failure) => EffectOutcome::failure(failure),
}
}
EffectRequestBody::Choose {
role,
partner,
labels,
state,
} => match self.handle_choose(&role, &partner, &labels, &state) {
EffectResult::Success(label) => {
EffectOutcome::success(EffectResponse::Choose { label })
}
EffectResult::Blocked => EffectOutcome::blocked(),
EffectResult::Failure(failure) => EffectOutcome::failure(failure),
},
EffectRequestBody::InvokeStep { role, state } => {
let mut state = state;
match self.step(&role, &mut state) {
EffectResult::Success(()) => {
EffectOutcome::success(EffectResponse::InvokeStep { state })
}
EffectResult::Blocked => EffectOutcome::blocked(),
EffectResult::Failure(failure) => EffectOutcome::failure(failure),
}
}
EffectRequestBody::Acquire { role, layer, state } => {
let Some(sid) = request.session else {
return EffectOutcome::failure(EffectFailure::contract_violation(
"acquire request is missing session",
));
};
match self.handle_acquire(sid, &role, &layer, &state) {
EffectResult::Success(evidence) => {
EffectOutcome::success(EffectResponse::Acquire { evidence })
}
EffectResult::Blocked => EffectOutcome::blocked(),
EffectResult::Failure(failure) => EffectOutcome::failure(failure),
}
}
EffectRequestBody::Release {
role,
layer,
evidence,
state,
} => {
let Some(sid) = request.session else {
return EffectOutcome::failure(EffectFailure::contract_violation(
"release request is missing session",
));
};
match self.handle_release(sid, &role, &layer, &evidence, &state) {
EffectResult::Success(()) => EffectOutcome::success(EffectResponse::Release),
EffectResult::Blocked => EffectOutcome::blocked(),
EffectResult::Failure(failure) => EffectOutcome::failure(failure),
}
}
EffectRequestBody::TopologyEvents { tick } => match self.topology_events(tick) {
EffectResult::Success(events) => {
EffectOutcome::success(EffectResponse::TopologyEvents { events })
}
EffectResult::Blocked => EffectOutcome::blocked(),
EffectResult::Failure(failure) => EffectOutcome::failure(failure),
},
EffectRequestBody::WalSync { sync } => match self.wal_sync(&sync) {
EffectResult::Success(()) => EffectOutcome::success(EffectResponse::WalSync),
EffectResult::Blocked => EffectOutcome::blocked(),
EffectResult::Failure(failure) => EffectOutcome::failure(failure),
},
EffectRequestBody::OutputConditionHint { role, state } => {
let Some(sid) = request.session else {
return EffectOutcome::failure(EffectFailure::contract_violation(
"output_condition_hint request is missing session",
));
};
let hint = self.output_condition_hint(sid, &role, &state);
EffectOutcome::success(EffectResponse::OutputConditionHint { hint })
}
}
}
fn handle_send(
&self,
role: &str,
partner: &str,
label: &str,
state: &[Value],
) -> EffectResult<Value>;
fn send_decision(&self, input: SendDecisionInput<'_>) -> EffectResult<SendDecision> {
if let Some(payload) = input.payload {
EffectResult::success(SendDecision::Deliver(payload))
} else {
self.handle_send(input.role, input.partner, input.label, input.state)
.map_success(SendDecision::Deliver)
}
}
fn handle_recv(
&self,
role: &str,
partner: &str,
label: &str,
state: &mut Vec<Value>,
payload: &Value,
) -> EffectResult<()>;
fn handle_choose(
&self,
role: &str,
partner: &str,
labels: &[String],
state: &[Value],
) -> EffectResult<String>;
fn step(&self, role: &str, state: &mut Vec<Value>) -> EffectResult<()>;
fn handle_acquire(
&self,
_sid: SessionId,
_role: &str,
_layer: &str,
_state: &[Value],
) -> EffectResult<Value> {
EffectResult::success(Value::Unit)
}
fn handle_release(
&self,
_sid: SessionId,
_role: &str,
_layer: &str,
_evidence: &Value,
_state: &[Value],
) -> EffectResult<()> {
EffectResult::success(())
}
fn supports_wal_sync(&self) -> bool {
false
}
fn wal_sync(&self, _sync: &crate::durable::WalSyncRequest) -> EffectResult<()> {
EffectResult::failure(EffectFailure::contract_violation(
"wal_sync requires an AgreementWalHandler wrapper",
))
}
fn topology_events(&self, _tick: u64) -> EffectResult<Vec<TopologyPerturbation>> {
EffectResult::success(Vec::new())
}
fn output_condition_hint(
&self,
_sid: SessionId,
_role: &str,
_state: &[Value],
) -> Option<OutputConditionHint> {
None
}
}
impl<T: EffectHandler + ?Sized> EffectHandler for &T {
fn handler_identity(&self) -> String {
(**self).handler_identity()
}
fn handle_effect(&self, request: EffectRequest) -> EffectOutcome {
(**self).handle_effect(request)
}
fn handle_send(
&self,
role: &str,
partner: &str,
label: &str,
state: &[Value],
) -> EffectResult<Value> {
(**self).handle_send(role, partner, label, state)
}
fn send_decision(&self, input: SendDecisionInput<'_>) -> EffectResult<SendDecision> {
(**self).send_decision(input)
}
fn handle_recv(
&self,
role: &str,
partner: &str,
label: &str,
state: &mut Vec<Value>,
payload: &Value,
) -> EffectResult<()> {
(**self).handle_recv(role, partner, label, state, payload)
}
fn handle_choose(
&self,
role: &str,
partner: &str,
labels: &[String],
state: &[Value],
) -> EffectResult<String> {
(**self).handle_choose(role, partner, labels, state)
}
fn step(&self, role: &str, state: &mut Vec<Value>) -> EffectResult<()> {
(**self).step(role, state)
}
fn handle_acquire(
&self,
sid: SessionId,
role: &str,
layer: &str,
state: &[Value],
) -> EffectResult<Value> {
(**self).handle_acquire(sid, role, layer, state)
}
fn handle_release(
&self,
sid: SessionId,
role: &str,
layer: &str,
evidence: &Value,
state: &[Value],
) -> EffectResult<()> {
(**self).handle_release(sid, role, layer, evidence, state)
}
fn topology_events(&self, tick: u64) -> EffectResult<Vec<TopologyPerturbation>> {
(**self).topology_events(tick)
}
fn output_condition_hint(
&self,
sid: SessionId,
role: &str,
state: &[Value],
) -> Option<OutputConditionHint> {
(**self).output_condition_hint(sid, role, state)
}
fn supports_wal_sync(&self) -> bool {
(**self).supports_wal_sync()
}
fn wal_sync(&self, sync: &crate::durable::WalSyncRequest) -> EffectResult<()> {
(**self).wal_sync(sync)
}
}