1pub trait EffectHandler: Send + Sync {
18 fn handler_identity(&self) -> String {
20 crate::session::DEFAULT_HANDLER_ID.to_string()
21 }
22
23 #[allow(clippy::too_many_lines)]
33 fn handle_effect(&self, request: EffectRequest) -> EffectOutcome {
34 if let Err(failure) = request.metadata.validate() {
35 return EffectOutcome::failure(failure);
36 }
37
38 match request.body {
39 EffectRequestBody::SendDecision {
40 role,
41 partner,
42 label,
43 state,
44 payload,
45 } => {
46 let Some(sid) = request.session else {
47 return EffectOutcome::failure(EffectFailure::contract_violation(
48 "send_decision request is missing session",
49 ));
50 };
51 match self.send_decision(SendDecisionInput {
52 sid,
53 role: &role,
54 partner: &partner,
55 label: &label,
56 state: &state,
57 payload,
58 }) {
59 EffectResult::Success(decision) => {
60 EffectOutcome::success(EffectResponse::SendDecision { decision })
61 }
62 EffectResult::Blocked => EffectOutcome::blocked(),
63 EffectResult::Failure(failure) => EffectOutcome::failure(failure),
64 }
65 }
66 EffectRequestBody::Receive {
67 role,
68 partner,
69 label,
70 state,
71 payload,
72 } => {
73 let mut state = state;
74 match self.handle_recv(&role, &partner, &label, &mut state, &payload) {
75 EffectResult::Success(()) => {
76 EffectOutcome::success(EffectResponse::Receive { state })
77 }
78 EffectResult::Blocked => EffectOutcome::blocked(),
79 EffectResult::Failure(failure) => EffectOutcome::failure(failure),
80 }
81 }
82 EffectRequestBody::Choose {
83 role,
84 partner,
85 labels,
86 state,
87 } => match self.handle_choose(&role, &partner, &labels, &state) {
88 EffectResult::Success(label) => {
89 EffectOutcome::success(EffectResponse::Choose { label })
90 }
91 EffectResult::Blocked => EffectOutcome::blocked(),
92 EffectResult::Failure(failure) => EffectOutcome::failure(failure),
93 },
94 EffectRequestBody::InvokeStep { role, state } => {
95 let mut state = state;
96 match self.step(&role, &mut state) {
97 EffectResult::Success(()) => {
98 EffectOutcome::success(EffectResponse::InvokeStep { state })
99 }
100 EffectResult::Blocked => EffectOutcome::blocked(),
101 EffectResult::Failure(failure) => EffectOutcome::failure(failure),
102 }
103 }
104 EffectRequestBody::Acquire { role, layer, state } => {
105 let Some(sid) = request.session else {
106 return EffectOutcome::failure(EffectFailure::contract_violation(
107 "acquire request is missing session",
108 ));
109 };
110 match self.handle_acquire(sid, &role, &layer, &state) {
111 EffectResult::Success(evidence) => {
112 EffectOutcome::success(EffectResponse::Acquire { evidence })
113 }
114 EffectResult::Blocked => EffectOutcome::blocked(),
115 EffectResult::Failure(failure) => EffectOutcome::failure(failure),
116 }
117 }
118 EffectRequestBody::Release {
119 role,
120 layer,
121 evidence,
122 state,
123 } => {
124 let Some(sid) = request.session else {
125 return EffectOutcome::failure(EffectFailure::contract_violation(
126 "release request is missing session",
127 ));
128 };
129 match self.handle_release(sid, &role, &layer, &evidence, &state) {
130 EffectResult::Success(()) => EffectOutcome::success(EffectResponse::Release),
131 EffectResult::Blocked => EffectOutcome::blocked(),
132 EffectResult::Failure(failure) => EffectOutcome::failure(failure),
133 }
134 }
135 EffectRequestBody::TopologyEvents { tick } => match self.topology_events(tick) {
136 EffectResult::Success(events) => {
137 EffectOutcome::success(EffectResponse::TopologyEvents { events })
138 }
139 EffectResult::Blocked => EffectOutcome::blocked(),
140 EffectResult::Failure(failure) => EffectOutcome::failure(failure),
141 },
142 EffectRequestBody::WalSync { sync } => match self.wal_sync(&sync) {
143 EffectResult::Success(()) => EffectOutcome::success(EffectResponse::WalSync),
144 EffectResult::Blocked => EffectOutcome::blocked(),
145 EffectResult::Failure(failure) => EffectOutcome::failure(failure),
146 },
147 EffectRequestBody::OutputConditionHint { role, state } => {
148 let Some(sid) = request.session else {
149 return EffectOutcome::failure(EffectFailure::contract_violation(
150 "output_condition_hint request is missing session",
151 ));
152 };
153 let hint = self.output_condition_hint(sid, &role, &state);
154 EffectOutcome::success(EffectResponse::OutputConditionHint { hint })
155 }
156 }
157 }
158
159 fn handle_send(
172 &self,
173 role: &str,
174 partner: &str,
175 label: &str,
176 state: &[Value],
177 ) -> EffectResult<Value>;
178
179 fn send_decision(&self, input: SendDecisionInput<'_>) -> EffectResult<SendDecision> {
186 if let Some(payload) = input.payload {
187 EffectResult::success(SendDecision::Deliver(payload))
188 } else {
189 self.handle_send(input.role, input.partner, input.label, input.state)
190 .map_success(SendDecision::Deliver)
191 }
192 }
193
194 fn handle_recv(
205 &self,
206 role: &str,
207 partner: &str,
208 label: &str,
209 state: &mut Vec<Value>,
210 payload: &Value,
211 ) -> EffectResult<()>;
212
213 fn handle_choose(
228 &self,
229 role: &str,
230 partner: &str,
231 labels: &[String],
232 state: &[Value],
233 ) -> EffectResult<String>;
234
235 fn step(&self, role: &str, state: &mut Vec<Value>) -> EffectResult<()>;
241
242 fn handle_acquire(
247 &self,
248 _sid: SessionId,
249 _role: &str,
250 _layer: &str,
251 _state: &[Value],
252 ) -> EffectResult<Value> {
253 EffectResult::success(Value::Unit)
254 }
255
256 fn handle_release(
258 &self,
259 _sid: SessionId,
260 _role: &str,
261 _layer: &str,
262 _evidence: &Value,
263 _state: &[Value],
264 ) -> EffectResult<()> {
265 EffectResult::success(())
266 }
267
268 fn supports_wal_sync(&self) -> bool {
270 false
271 }
272
273 fn wal_sync(&self, _sync: &crate::durable::WalSyncRequest) -> EffectResult<()> {
275 EffectResult::failure(EffectFailure::contract_violation(
276 "wal_sync requires an AgreementWalHandler wrapper",
277 ))
278 }
279
280 fn topology_events(&self, _tick: u64) -> EffectResult<Vec<TopologyPerturbation>> {
289 EffectResult::success(Vec::new())
290 }
291
292 fn output_condition_hint(
297 &self,
298 _sid: SessionId,
299 _role: &str,
300 _state: &[Value],
301 ) -> Option<OutputConditionHint> {
302 None
303 }
304}
305
306impl<T: EffectHandler + ?Sized> EffectHandler for &T {
307 fn handler_identity(&self) -> String {
308 (**self).handler_identity()
309 }
310
311 fn handle_effect(&self, request: EffectRequest) -> EffectOutcome {
312 (**self).handle_effect(request)
313 }
314
315 fn handle_send(
316 &self,
317 role: &str,
318 partner: &str,
319 label: &str,
320 state: &[Value],
321 ) -> EffectResult<Value> {
322 (**self).handle_send(role, partner, label, state)
323 }
324
325 fn send_decision(&self, input: SendDecisionInput<'_>) -> EffectResult<SendDecision> {
326 (**self).send_decision(input)
327 }
328
329 fn handle_recv(
330 &self,
331 role: &str,
332 partner: &str,
333 label: &str,
334 state: &mut Vec<Value>,
335 payload: &Value,
336 ) -> EffectResult<()> {
337 (**self).handle_recv(role, partner, label, state, payload)
338 }
339
340 fn handle_choose(
341 &self,
342 role: &str,
343 partner: &str,
344 labels: &[String],
345 state: &[Value],
346 ) -> EffectResult<String> {
347 (**self).handle_choose(role, partner, labels, state)
348 }
349
350 fn step(&self, role: &str, state: &mut Vec<Value>) -> EffectResult<()> {
351 (**self).step(role, state)
352 }
353
354 fn handle_acquire(
355 &self,
356 sid: SessionId,
357 role: &str,
358 layer: &str,
359 state: &[Value],
360 ) -> EffectResult<Value> {
361 (**self).handle_acquire(sid, role, layer, state)
362 }
363
364 fn handle_release(
365 &self,
366 sid: SessionId,
367 role: &str,
368 layer: &str,
369 evidence: &Value,
370 state: &[Value],
371 ) -> EffectResult<()> {
372 (**self).handle_release(sid, role, layer, evidence, state)
373 }
374
375 fn topology_events(&self, tick: u64) -> EffectResult<Vec<TopologyPerturbation>> {
376 (**self).topology_events(tick)
377 }
378
379 fn output_condition_hint(
380 &self,
381 sid: SessionId,
382 role: &str,
383 state: &[Value],
384 ) -> Option<OutputConditionHint> {
385 (**self).output_condition_hint(sid, role, state)
386 }
387
388 fn supports_wal_sync(&self) -> bool {
389 (**self).supports_wal_sync()
390 }
391
392 fn wal_sync(&self, sync: &crate::durable::WalSyncRequest) -> EffectResult<()> {
393 (**self).wal_sync(sync)
394 }
395}