Skip to main content

statevec_api/
lib.rs

1// Copyright 2026 Jumpex Technology.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Runtime-facing StateVec host APIs and plugin contracts.
5//!
6//! Domain plugins use this crate to implement [`RuntimePlugin`] and execute
7//! deterministic transactions against a host-provided [`RuntimeHostContext`].
8//! Runtime hosts use the same traits and the exported plugin ABI to load domain
9//! code across a process or dynamic-library boundary.
10
11pub use statevec_model::command::Command;
12use statevec_model::event::GeneratedEventAccess;
13pub use statevec_model::record::RecordKey;
14use statevec_model::record::{GeneratedRecordAccess, RecordKind, SysId};
15use statevec_model::{CommandDefinition, SchemaRegistry};
16
17mod plugin_abi_v1;
18mod throughput_probe;
19pub use plugin_abi_v1::{
20    ExportedRuntimePluginV1Handle, RUNTIME_PLUGIN_ABI_VERSION_V1, RUNTIME_PLUGIN_ENTRY_V1_SYMBOL,
21    RuntimeBytesMutRef, RuntimeBytesMutVisitor, RuntimeBytesRef, RuntimeBytesVisitor,
22    RuntimeCallStatus, RuntimeCommandView, RuntimeErrorBuf, RuntimeErrorKind, RuntimeErrorPhase,
23    RuntimeHostContextV1, RuntimeHostContextV1Adapter, RuntimeHostVTableV1, RuntimePluginApiV1,
24    RuntimePluginEntryV1, RuntimeReadContextV1, RuntimeReadContextV1Adapter, RuntimeReadVTableV1,
25    RuntimeRecordKeyView, RuntimeRecordKeyVisitor, clear_runtime_error, runtime_bytes_slice,
26    runtime_bytes_slice_mut, runtime_error_kind, runtime_error_message, runtime_error_text,
27    runtime_plugin_create_runtime_v1, runtime_plugin_destroy_runtime_v1, runtime_plugin_name_v1,
28    runtime_plugin_on_unload_v1, runtime_plugin_run_tx_v1, runtime_plugin_schema_bytes_v1,
29    runtime_plugin_validate_biz_invariants_v1, write_runtime_error,
30};
31pub use throughput_probe::RuntimeApiProbe;
32use throughput_probe::{
33    on_runtime_host_update_typed_by_pk, on_runtime_host_with_read_typed_by_pk,
34    on_typed_tx_update_or_create_typed_by_pk, on_typed_tx_update_typed_by_pk,
35    on_typed_tx_with_read_typed_by_pk,
36};
37
38/// Logical API compatibility version.
39///
40/// Compatibility is keyed to the stable runtime plugin ABI, not the crate
41/// package version.
42pub const STATEVEC_API_VERSION: &str = "1";
43/// Numeric runtime plugin ABI compatibility version.
44pub const STATEVEC_API_COMPAT_VERSION: u32 = RUNTIME_PLUGIN_ABI_VERSION_V1;
45
46#[cfg(test)]
47mod ut_api_compat_version {
48    #[test]
49    fn api_compat_version_is_not_the_crate_package_version() {
50        assert_eq!(super::STATEVEC_API_VERSION, "1");
51        assert_eq!(
52            super::STATEVEC_API_COMPAT_VERSION,
53            super::RUNTIME_PLUGIN_ABI_VERSION_V1
54        );
55        assert_ne!(super::STATEVEC_API_VERSION, env!("CARGO_PKG_VERSION"));
56    }
57}
58
59/// Error returned by host context operations.
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct RuntimeHostError {
62    /// Human-readable error message.
63    pub message: String,
64}
65
66impl RuntimeHostError {
67    /// Creates a host error from a message.
68    pub fn new(message: impl Into<String>) -> Self {
69        Self {
70            message: message.into(),
71        }
72    }
73}
74
75impl std::fmt::Display for RuntimeHostError {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.write_str(&self.message)
78    }
79}
80
81impl std::error::Error for RuntimeHostError {}
82
83/// Error returned while creating or loading a runtime plugin.
84#[derive(Debug, Clone, PartialEq, Eq)]
85pub struct RuntimePluginLoadError {
86    /// Human-readable error message.
87    pub message: String,
88}
89
90impl RuntimePluginLoadError {
91    /// Creates a plugin load error from a message.
92    pub fn new(message: impl Into<String>) -> Self {
93        Self {
94            message: message.into(),
95        }
96    }
97}
98
99impl std::fmt::Display for RuntimePluginLoadError {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        f.write_str(&self.message)
102    }
103}
104
105impl std::error::Error for RuntimePluginLoadError {}
106
107/// Error returned by plugin transaction execution.
108#[derive(Debug, Clone, PartialEq, Eq)]
109pub struct RuntimePluginError {
110    /// Human-readable error message.
111    pub message: String,
112}
113
114impl RuntimePluginError {
115    /// Creates a plugin execution error from a message.
116    pub fn new(message: impl Into<String>) -> Self {
117        Self {
118            message: message.into(),
119        }
120    }
121}
122
123impl std::fmt::Display for RuntimePluginError {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        f.write_str(&self.message)
126    }
127}
128
129impl std::error::Error for RuntimePluginError {}
130
131/// Error returned when unloading a runtime plugin.
132#[derive(Debug, Clone, PartialEq, Eq)]
133pub struct RuntimePluginUnloadError {
134    /// Human-readable error message.
135    pub message: String,
136}
137
138impl RuntimePluginUnloadError {
139    /// Creates a plugin unload error from a message.
140    pub fn new(message: impl Into<String>) -> Self {
141        Self {
142            message: message.into(),
143        }
144    }
145}
146
147impl std::fmt::Display for RuntimePluginUnloadError {
148    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149        f.write_str(&self.message)
150    }
151}
152
153impl std::error::Error for RuntimePluginUnloadError {}
154
155/// Object-safe host capability boundary used by runtime plugins.
156///
157/// Engine internals such as `runtime_engine::TxAccess` stay on the host side and
158/// implement this trait through an adapter. Plugin-facing code should depend on
159/// this capability surface instead of any concrete engine transaction type.
160pub trait RuntimeHostContext {
161    /// Reads a record by system id and passes its bytes to `f` when found.
162    fn with_read_typed_raw(
163        &self,
164        record_kind: RecordKind,
165        sys_id: SysId,
166        f: &mut dyn FnMut(&[u8]),
167    ) -> Result<bool, RuntimeHostError>;
168
169    /// Reads a record by primary-key bytes and passes its bytes to `f` when found.
170    fn with_read_typed_by_pk_raw(
171        &self,
172        record_kind: RecordKind,
173        pk: &[u8],
174        f: &mut dyn FnMut(&[u8]),
175    ) -> Result<bool, RuntimeHostError>;
176
177    /// Creates a record and initializes its data bytes through `init`.
178    fn create_typed_raw(
179        &mut self,
180        record_kind: RecordKind,
181        init: &mut dyn FnMut(&mut [u8]),
182    ) -> Result<RecordKey, RuntimeHostError>;
183
184    /// Updates a record by primary-key bytes in-place when found.
185    fn update_typed_by_pk_raw(
186        &mut self,
187        record_kind: RecordKind,
188        pk: &[u8],
189        f: &mut dyn FnMut(&mut [u8]),
190    ) -> Result<bool, RuntimeHostError>;
191
192    /// Deletes a record by primary-key bytes.
193    fn delete_by_pk_raw(
194        &mut self,
195        record_kind: RecordKind,
196        pk: &[u8],
197    ) -> Result<bool, RuntimeHostError>;
198
199    /// Emits an event payload from the current transaction.
200    fn emit_typed_event_raw(
201        &mut self,
202        event_kind: u8,
203        payload: &[u8],
204    ) -> Result<(), RuntimeHostError>;
205
206    /// Iterates visible record keys for one record kind.
207    fn for_each_record_key_raw(
208        &self,
209        kind: RecordKind,
210        f: &mut dyn FnMut(RecordKey),
211    ) -> Result<(), RuntimeHostError>;
212
213    /// Emits host-side diagnostic text.
214    fn debug_log(&mut self, _message: String) -> Result<(), RuntimeHostError> {
215        Ok(())
216    }
217}
218
219/// Typed convenience methods layered on top of the raw
220/// [`RuntimeHostContext`] capability boundary.
221///
222/// Plugin authors use this extension trait directly on `&mut dyn
223/// RuntimeHostContext`, while host-side engine code only needs to implement the
224/// raw object-safe methods.
225pub trait RuntimeHostContextExt: RuntimeHostContext {
226    /// Reads a generated record by system id.
227    fn with_read_typed<R, T, F>(&self, sys_id: SysId, f: F) -> Result<Option<T>, RuntimeHostError>
228    where
229        R: GeneratedRecordAccess,
230        F: FnOnce(R::Access<'_>) -> T,
231    {
232        let mut f = Some(f);
233        let mut out = None;
234        let found = self.with_read_typed_raw(R::KIND, sys_id, &mut |data| {
235            let apply = f.take().expect("callback invoked more than once");
236            out = Some(apply(R::wrap(data)));
237        })?;
238        Ok(found.then_some(out).flatten())
239    }
240
241    /// Read a record by primary key without mutating it.
242    ///
243    /// Prefer [`RuntimeHostContextExt::update_typed_by_pk`] when the next step
244    /// is to modify the same record. The update path keeps the mutation
245    /// in-place and avoids an extra resolve/read/then-update round-trip.
246    fn with_read_typed_by_pk<R, P, T, F>(&self, pk: P, f: F) -> Result<Option<T>, RuntimeHostError>
247    where
248        R: GeneratedRecordAccess,
249        P: AsRef<[u8]>,
250        F: FnOnce(R::Access<'_>) -> T,
251    {
252        on_runtime_host_with_read_typed_by_pk();
253        let mut f = Some(f);
254        let mut out = None;
255        let found = self.with_read_typed_by_pk_raw(R::KIND, pk.as_ref(), &mut |data| {
256            let apply = f.take().expect("callback invoked more than once");
257            out = Some(apply(R::wrap(data)));
258        })?;
259        Ok(found.then_some(out).flatten())
260    }
261
262    /// Creates a generated record.
263    fn create_typed<R, F>(&mut self, init: F) -> Result<RecordKey, RuntimeHostError>
264    where
265        R: GeneratedRecordAccess,
266        F: for<'b> FnOnce(&mut R::NewBuilder<'b>),
267    {
268        let mut init = Some(init);
269        let key = self.create_typed_raw(R::KIND, &mut |buf| {
270            let apply = init.take().expect("init callback invoked more than once");
271            let mut builder = R::wrap_new(buf);
272            apply(&mut builder);
273        })?;
274        Ok(key)
275    }
276
277    /// Update a record in-place by primary key.
278    ///
279    /// This is the preferred hot-path API when a command needs to mutate an
280    /// existing record. It avoids the read-then-update pattern that would
281    /// otherwise resolve the primary key and touch the same record twice.
282    fn update_typed_by_pk<R, P, T, F>(&mut self, pk: P, f: F) -> Result<Option<T>, RuntimeHostError>
283    where
284        R: GeneratedRecordAccess,
285        P: AsRef<[u8]>,
286        F: for<'b> FnOnce(&mut R::UpdateBuilder<'b>) -> T,
287    {
288        on_runtime_host_update_typed_by_pk();
289        let mut f = Some(f);
290        let mut out = None;
291        let found = self.update_typed_by_pk_raw(R::KIND, pk.as_ref(), &mut |buf| {
292            let apply = f.take().expect("update callback invoked more than once");
293            let mut builder = R::wrap_update(buf);
294            out = Some(apply(&mut builder));
295        })?;
296        Ok(found.then_some(out).flatten())
297    }
298
299    /// Updates an existing record by primary key or creates a new one.
300    fn update_or_create_typed_by_pk<R, P, T, FU, FC>(
301        &mut self,
302        pk: P,
303        update: FU,
304        create: FC,
305    ) -> Result<T, RuntimeHostError>
306    where
307        R: GeneratedRecordAccess,
308        P: AsRef<[u8]>,
309        FU: for<'b> FnOnce(&mut R::UpdateBuilder<'b>) -> T,
310        FC: for<'b> FnOnce(&mut R::NewBuilder<'b>) -> T,
311    {
312        if let Some(value) = self.update_typed_by_pk::<R, P, T, FU>(pk, update)? {
313            return Ok(value);
314        }
315
316        let mut out = None;
317        self.create_typed::<R, _>(|builder| {
318            out = Some(create(builder));
319        })?;
320        Ok(out.expect("create closure must produce a value"))
321    }
322
323    /// Deletes a generated record by primary key.
324    fn delete_by_pk<R, P>(&mut self, pk: P) -> Result<bool, RuntimeHostError>
325    where
326        R: GeneratedRecordAccess,
327        P: AsRef<[u8]>,
328    {
329        self.delete_by_pk_raw(R::KIND, pk.as_ref())
330    }
331
332    /// Emits a generated event payload.
333    fn emit_typed_event<E>(&mut self, payload: Vec<u8>) -> Result<(), RuntimeHostError>
334    where
335        E: GeneratedEventAccess,
336    {
337        self.emit_typed_event_raw(E::KIND, &payload)
338    }
339
340    /// Iterates record keys for one record kind.
341    fn for_each_record_key(
342        &self,
343        kind: RecordKind,
344        f: &mut dyn FnMut(RecordKey),
345    ) -> Result<(), RuntimeHostError> {
346        self.for_each_record_key_raw(kind, &mut |key| f(key))
347    }
348}
349
350impl<T: RuntimeHostContext + ?Sized> RuntimeHostContextExt for T {}
351
352/// Factory used by a runtime host to create configured plugin instances.
353pub trait RuntimePluginFactory {
354    /// Stable plugin name.
355    fn plugin_name(&self) -> &'static str;
356
357    /// Schema registry exposed by this plugin.
358    fn schema_registry(&self) -> SchemaRegistry;
359
360    /// Command definitions exposed by this plugin.
361    fn command_definitions(&self) -> &'static [&'static CommandDefinition] {
362        &[]
363    }
364
365    /// Creates a configured runtime plugin instance.
366    fn create(
367        &self,
368        plugin_config_text: &str,
369    ) -> Result<Box<dyn RuntimePlugin>, RuntimePluginLoadError>;
370}
371
372/// Read-only state access surface for invariant validation.
373///
374/// This is the read-only subset of [`RuntimeHostContext`], provided to
375/// [`RuntimePlugin::validate_biz_invariants`] so that domain-specific business
376/// invariants can be checked without mutating state.
377pub trait BizInvariantReadContext {
378    /// Reads a record by system id and passes its bytes to `f` when found.
379    fn with_read_typed_raw(
380        &self,
381        record_kind: RecordKind,
382        sys_id: SysId,
383        f: &mut dyn FnMut(&[u8]),
384    ) -> Result<bool, RuntimeHostError>;
385
386    /// Reads a record by primary-key bytes and passes its bytes to `f` when found.
387    fn with_read_typed_by_pk_raw(
388        &self,
389        record_kind: RecordKind,
390        pk: &[u8],
391        f: &mut dyn FnMut(&[u8]),
392    ) -> Result<bool, RuntimeHostError>;
393
394    /// Iterates visible record keys for one record kind.
395    fn for_each_record_key_raw(
396        &self,
397        kind: RecordKind,
398        f: &mut dyn FnMut(RecordKey),
399    ) -> Result<(), RuntimeHostError>;
400}
401
402/// Typed convenience methods for [`BizInvariantReadContext`].
403pub trait InvariantReadContextExt: BizInvariantReadContext {
404    /// Reads a generated record by system id.
405    fn with_read_typed<R, T, F>(&self, sys_id: SysId, f: F) -> Result<Option<T>, RuntimeHostError>
406    where
407        R: GeneratedRecordAccess,
408        F: FnOnce(R::Access<'_>) -> T,
409    {
410        let mut result = None;
411        let mut f = Some(f);
412        let found = self.with_read_typed_raw(R::KIND, sys_id, &mut |data| {
413            if let Some(f) = f.take() {
414                result = Some(f(R::wrap(data)));
415            }
416        })?;
417        if found { Ok(result) } else { Ok(None) }
418    }
419
420    /// Reads a generated record by primary key.
421    fn with_read_typed_by_pk<R, P, T, F>(&self, pk: P, f: F) -> Result<Option<T>, RuntimeHostError>
422    where
423        R: GeneratedRecordAccess,
424        P: AsRef<[u8]>,
425        F: FnOnce(R::Access<'_>) -> T,
426    {
427        let mut result = None;
428        let mut f = Some(f);
429        let found = self.with_read_typed_by_pk_raw(R::KIND, pk.as_ref(), &mut |data| {
430            if let Some(f) = f.take() {
431                result = Some(f(R::wrap(data)));
432            }
433        })?;
434        if found { Ok(result) } else { Ok(None) }
435    }
436}
437
438impl<T: BizInvariantReadContext + ?Sized> InvariantReadContextExt for T {}
439
440/// Domain runtime plugin contract.
441pub trait RuntimePlugin {
442    /// Stable plugin name.
443    fn name(&self) -> &'static str;
444
445    /// Schema registry exposed by this plugin.
446    fn schema_registry(&self) -> SchemaRegistry;
447
448    /// Command definitions exposed by this plugin.
449    fn command_definitions(&self) -> &'static [&'static CommandDefinition] {
450        &[]
451    }
452
453    /// Executes one command transaction.
454    fn run_tx(
455        &self,
456        tx: &mut dyn RuntimeHostContext,
457        command: &dyn RuntimeCommandEnvelope,
458    ) -> Result<(), RuntimePluginError>;
459
460    /// Validate domain-specific business invariants against committed state.
461    ///
462    /// Called at two points:
463    /// - after recovery replay, before declaring recovered state healthy
464    /// - before publishing a new recovery boundary (snapshot publication)
465    ///
466    /// Return `Ok(())` if all invariants hold, or `Err(message)` to block
467    /// the operation. The default implementation performs no checks.
468    fn validate_biz_invariants(&self, _ctx: &dyn BizInvariantReadContext) -> Result<(), String> {
469        Ok(())
470    }
471
472    /// Called before plugin unload so domain code can release resources.
473    fn on_unload(&mut self) -> Result<(), RuntimePluginUnloadError> {
474        Ok(())
475    }
476}
477
478/// Borrowed command envelope passed to runtime plugins.
479pub trait RuntimeCommandEnvelope {
480    /// Returns the command kind.
481    fn command_kind(&self) -> u8;
482    /// Returns the source queue sequence.
483    fn ext_seq(&self) -> u64;
484    /// Returns the source-provided reference time in microseconds.
485    fn ref_ext_time_us(&self) -> u64;
486    /// Returns the encoded command payload.
487    fn payload(&self) -> &[u8];
488}
489
490/// Borrowed runtime command envelope.
491#[derive(Debug, Clone, Copy)]
492pub struct RuntimeCommandRef<'a> {
493    command_kind: u8,
494    ext_seq: u64,
495    ref_ext_time_us: u64,
496    payload: &'a [u8],
497}
498
499impl<'a> RuntimeCommandRef<'a> {
500    /// Creates a borrowed runtime command envelope.
501    #[inline]
502    pub fn new(command_kind: u8, ext_seq: u64, ref_ext_time_us: u64, payload: &'a [u8]) -> Self {
503        Self {
504            command_kind,
505            ext_seq,
506            ref_ext_time_us,
507            payload,
508        }
509    }
510}
511
512impl RuntimeCommandEnvelope for RuntimeCommandRef<'_> {
513    #[inline(always)]
514    fn command_kind(&self) -> u8 {
515        self.command_kind
516    }
517
518    #[inline(always)]
519    fn ext_seq(&self) -> u64 {
520        self.ext_seq
521    }
522
523    #[inline(always)]
524    fn ref_ext_time_us(&self) -> u64 {
525        self.ref_ext_time_us
526    }
527
528    #[inline(always)]
529    fn payload(&self) -> &[u8] {
530        self.payload
531    }
532}
533
534impl RuntimeCommandEnvelope for Command {
535    #[inline(always)]
536    fn command_kind(&self) -> u8 {
537        self.command_kind()
538    }
539
540    #[inline(always)]
541    fn ext_seq(&self) -> u64 {
542        self.ext_seq()
543    }
544
545    #[inline(always)]
546    fn ref_ext_time_us(&self) -> u64 {
547        self.ref_ext_time_us()
548    }
549
550    #[inline(always)]
551    fn payload(&self) -> &[u8] {
552        self.payload()
553    }
554}
555
556/// Read-only raw transaction capability used by typed convenience APIs.
557pub trait TxReadContext {
558    /// Host error type.
559    type Error;
560
561    /// Reads a record by key.
562    fn with_read_raw<T>(
563        &self,
564        key: RecordKey,
565        f: impl FnOnce(&[u8]) -> T,
566    ) -> Result<Option<T>, Self::Error>;
567    /// Iterates record keys for one record kind.
568    fn for_each_record_key(&self, kind: RecordKind, f: &mut dyn FnMut(RecordKey));
569}
570
571/// Primary-key lookup capability for raw transaction contexts.
572pub trait TxPkContext: TxReadContext {
573    /// Resolves primary-key bytes to a system id.
574    fn resolve_pk(&self, kind: RecordKind, pk: &[u8]) -> Result<Option<SysId>, Self::Error>;
575}
576
577/// Raw write capability for transaction contexts.
578pub trait TxWriteContext: TxReadContext {
579    /// Creates a record from encoded data bytes.
580    fn create_raw(&mut self, kind: RecordKind, data: Vec<u8>) -> Result<RecordKey, Self::Error>;
581    /// Updates a record by key.
582    fn update_raw<T>(
583        &mut self,
584        key: RecordKey,
585        f: impl FnOnce(&mut [u8]) -> T,
586    ) -> Result<Option<T>, Self::Error>;
587    /// Deletes a record by key.
588    fn delete_raw(&mut self, key: RecordKey) -> Result<bool, Self::Error>;
589    /// Emits an event payload.
590    fn emit_event_raw(&mut self, event_kind: u8, payload: Vec<u8>);
591    /// Emits host-side diagnostic text.
592    fn debug_log(&mut self, _message: String) {}
593}
594
595/// Raw create capability that accepts a host-assigned system id.
596pub trait TxSysIdCreateContext: TxWriteContext {
597    /// Creates a record with an explicit system id.
598    fn create_with_sys_id_raw(
599        &mut self,
600        kind: RecordKind,
601        sys_id: SysId,
602        data: Vec<u8>,
603    ) -> Result<RecordKey, Self::Error>;
604}
605
606/// Full raw transaction context capability set.
607pub trait TxContext: TxPkContext + TxSysIdCreateContext {}
608
609impl<T: TxPkContext + TxSysIdCreateContext + ?Sized> TxContext for T {}
610
611/// Typed transaction convenience API over generated StateVec accessors.
612pub trait TypedTxContext {
613    /// Host error type.
614    type Error;
615
616    /// Reads a generated record by system id.
617    fn with_read_typed<R, T, F>(&self, sys_id: SysId, f: F) -> Result<Option<T>, Self::Error>
618    where
619        R: GeneratedRecordAccess,
620        F: FnOnce(R::Access<'_>) -> T;
621
622    /// Read a record by primary key without mutating it.
623    ///
624    /// Prefer [`TypedTxContext::update_typed_by_pk`] when the next step is to
625    /// modify the same record. The update path keeps the operation in-place and
626    /// avoids the extra resolve/read/then-update round-trip that this read-first
627    /// pattern introduces on hot paths.
628    fn with_read_typed_by_pk<R, P, T, F>(&self, pk: P, f: F) -> Result<Option<T>, Self::Error>
629    where
630        R: GeneratedRecordAccess,
631        P: AsRef<[u8]>,
632        F: FnOnce(R::Access<'_>) -> T;
633
634    /// Creates a generated record.
635    fn create_typed<R, F>(&mut self, init: F) -> Result<RecordKey, Self::Error>
636    where
637        R: GeneratedRecordAccess,
638        F: for<'b> FnOnce(&mut R::NewBuilder<'b>);
639
640    /// Update a record in-place by primary key.
641    ///
642    /// This is the preferred hot-path API when a command needs to modify an
643    /// existing record. It avoids the read-then-update pattern that would
644    /// otherwise resolve the primary key and touch the same record twice.
645    fn update_typed_by_pk<R, P, T, F>(&mut self, pk: P, f: F) -> Result<Option<T>, Self::Error>
646    where
647        R: GeneratedRecordAccess,
648        P: AsRef<[u8]>,
649        F: for<'b> FnOnce(&mut R::UpdateBuilder<'b>) -> T;
650
651    /// Updates an existing record by primary key or creates a new one.
652    fn update_or_create_typed_by_pk<R, P, T, FU, FC>(
653        &mut self,
654        pk: P,
655        update: FU,
656        create: FC,
657    ) -> Result<T, Self::Error>
658    where
659        R: GeneratedRecordAccess,
660        P: AsRef<[u8]>,
661        FU: for<'b> FnOnce(&mut R::UpdateBuilder<'b>) -> T,
662        FC: for<'b> FnOnce(&mut R::NewBuilder<'b>) -> T,
663    {
664        if let Some(value) = self.update_typed_by_pk::<R, P, T, FU>(pk, update)? {
665            return Ok(value);
666        }
667        let mut out = None;
668        self.create_typed::<R, _>(|builder| {
669            out = Some(create(builder));
670        })?;
671        Ok(out.expect("create closure must produce a value"))
672    }
673
674    /// Deletes a generated record by primary key.
675    fn delete_by_pk<R, P>(&mut self, pk: P) -> Result<bool, Self::Error>
676    where
677        R: GeneratedRecordAccess,
678        P: AsRef<[u8]>;
679
680    /// Emits a generated event payload.
681    fn emit_typed_event<E>(&mut self, payload: Vec<u8>)
682    where
683        E: GeneratedEventAccess;
684
685    /// Iterates record keys for one record kind.
686    fn for_each_record_key(&self, kind: RecordKind, f: &mut dyn FnMut(RecordKey));
687
688    /// Emits host-side diagnostic text.
689    fn debug_log(&mut self, _message: String) {}
690}
691
692impl<Ctx: TxContext + ?Sized> TypedTxContext for Ctx {
693    type Error = <Ctx as TxReadContext>::Error;
694
695    fn with_read_typed<R, T, F>(&self, sys_id: SysId, f: F) -> Result<Option<T>, Self::Error>
696    where
697        R: GeneratedRecordAccess,
698        F: FnOnce(R::Access<'_>) -> T,
699    {
700        TxReadContext::with_read_raw(
701            self,
702            RecordKey {
703                kind: R::KIND,
704                sys_id,
705            },
706            |data| f(R::wrap(data)),
707        )
708    }
709
710    fn with_read_typed_by_pk<R, P, T, F>(&self, pk: P, f: F) -> Result<Option<T>, Self::Error>
711    where
712        R: GeneratedRecordAccess,
713        P: AsRef<[u8]>,
714        F: FnOnce(R::Access<'_>) -> T,
715    {
716        on_typed_tx_with_read_typed_by_pk();
717        let Some(sys_id) = TxPkContext::resolve_pk(self, R::KIND, pk.as_ref())? else {
718            return Ok(None);
719        };
720        TypedTxContext::with_read_typed::<R, T, F>(self, sys_id, f)
721    }
722
723    fn create_typed<R, F>(&mut self, init: F) -> Result<RecordKey, Self::Error>
724    where
725        R: GeneratedRecordAccess,
726        F: for<'b> FnOnce(&mut R::NewBuilder<'b>),
727    {
728        let mut data = vec![0u8; R::DATA_LEN];
729        init(&mut R::wrap_new(&mut data));
730        TxWriteContext::create_raw(self, R::KIND, data)
731    }
732
733    fn update_typed_by_pk<R, P, T, F>(&mut self, pk: P, f: F) -> Result<Option<T>, Self::Error>
734    where
735        R: GeneratedRecordAccess,
736        P: AsRef<[u8]>,
737        F: for<'b> FnOnce(&mut R::UpdateBuilder<'b>) -> T,
738    {
739        on_typed_tx_update_typed_by_pk();
740        let Some(sys_id) = TxPkContext::resolve_pk(self, R::KIND, pk.as_ref())? else {
741            return Ok(None);
742        };
743        TxWriteContext::update_raw(
744            self,
745            RecordKey {
746                kind: R::KIND,
747                sys_id,
748            },
749            |data| {
750                let mut builder = R::wrap_update(data);
751                f(&mut builder)
752            },
753        )
754    }
755
756    fn update_or_create_typed_by_pk<R, P, T, FU, FC>(
757        &mut self,
758        pk: P,
759        update: FU,
760        create: FC,
761    ) -> Result<T, Self::Error>
762    where
763        R: GeneratedRecordAccess,
764        P: AsRef<[u8]>,
765        FU: for<'b> FnOnce(&mut R::UpdateBuilder<'b>) -> T,
766        FC: for<'b> FnOnce(&mut R::NewBuilder<'b>) -> T,
767    {
768        on_typed_tx_update_or_create_typed_by_pk();
769        if let Some(value) = self.update_typed_by_pk::<R, P, T, FU>(pk, update)? {
770            return Ok(value);
771        }
772
773        let mut out = None;
774        self.create_typed::<R, _>(|builder| {
775            out = Some(create(builder));
776        })?;
777        Ok(out.expect("create closure must produce a value"))
778    }
779
780    fn delete_by_pk<R, P>(&mut self, pk: P) -> Result<bool, Self::Error>
781    where
782        R: GeneratedRecordAccess,
783        P: AsRef<[u8]>,
784    {
785        let Some(sys_id) = TxPkContext::resolve_pk(self, R::KIND, pk.as_ref())? else {
786            return Ok(false);
787        };
788        TxWriteContext::delete_raw(
789            self,
790            RecordKey {
791                kind: R::KIND,
792                sys_id,
793            },
794        )
795    }
796
797    fn emit_typed_event<E>(&mut self, payload: Vec<u8>)
798    where
799        E: GeneratedEventAccess,
800    {
801        TxWriteContext::emit_event_raw(self, E::KIND, payload);
802    }
803
804    fn for_each_record_key(&self, kind: RecordKind, f: &mut dyn FnMut(RecordKey)) {
805        TxReadContext::for_each_record_key(self, kind, f);
806    }
807
808    fn debug_log(&mut self, message: String) {
809        TxWriteContext::debug_log(self, message);
810    }
811}
812
813/// Bridge `dyn RuntimeHostContext` to [`TypedTxContext`] so that
814/// `command_dispatch!`-generated code can operate on plugin-facing host
815/// adapters without requiring engine-internal raw transaction capabilities.
816impl TypedTxContext for dyn RuntimeHostContext + '_ {
817    type Error = RuntimeHostError;
818
819    fn with_read_typed<R, T, F>(&self, sys_id: SysId, f: F) -> Result<Option<T>, Self::Error>
820    where
821        R: GeneratedRecordAccess,
822        F: FnOnce(R::Access<'_>) -> T,
823    {
824        RuntimeHostContextExt::with_read_typed::<R, T, F>(self, sys_id, f)
825    }
826
827    fn with_read_typed_by_pk<R, P, T, F>(&self, pk: P, f: F) -> Result<Option<T>, Self::Error>
828    where
829        R: GeneratedRecordAccess,
830        P: AsRef<[u8]>,
831        F: FnOnce(R::Access<'_>) -> T,
832    {
833        RuntimeHostContextExt::with_read_typed_by_pk::<R, P, T, F>(self, pk, f)
834    }
835
836    fn create_typed<R, F>(&mut self, init: F) -> Result<RecordKey, Self::Error>
837    where
838        R: GeneratedRecordAccess,
839        F: for<'b> FnOnce(&mut R::NewBuilder<'b>),
840    {
841        RuntimeHostContextExt::create_typed::<R, F>(self, init)
842    }
843
844    fn update_typed_by_pk<R, P, T, F>(&mut self, pk: P, f: F) -> Result<Option<T>, Self::Error>
845    where
846        R: GeneratedRecordAccess,
847        P: AsRef<[u8]>,
848        F: for<'b> FnOnce(&mut R::UpdateBuilder<'b>) -> T,
849    {
850        RuntimeHostContextExt::update_typed_by_pk::<R, P, T, F>(self, pk, f)
851    }
852
853    fn delete_by_pk<R, P>(&mut self, pk: P) -> Result<bool, Self::Error>
854    where
855        R: GeneratedRecordAccess,
856        P: AsRef<[u8]>,
857    {
858        RuntimeHostContextExt::delete_by_pk::<R, P>(self, pk)
859    }
860
861    fn emit_typed_event<E>(&mut self, payload: Vec<u8>)
862    where
863        E: GeneratedEventAccess,
864    {
865        RuntimeHostContextExt::emit_typed_event::<E>(self, payload)
866            .expect("host emit_typed_event_raw failed");
867    }
868
869    fn for_each_record_key(&self, kind: RecordKind, f: &mut dyn FnMut(RecordKey)) {
870        let _ = RuntimeHostContextExt::for_each_record_key(self, kind, f);
871    }
872
873    fn debug_log(&mut self, message: String) {
874        let _ = RuntimeHostContext::debug_log(self, message);
875    }
876}
877
878/// Minimal producer-side contract for a single globally ordered queue.
879///
880/// The producer appends one opaque record payload and receives the queue's
881/// acceptance sequence (`ext_seq`) back. This is the `accepted` boundary.
882pub trait QueueProducer {
883    type Error;
884
885    fn append(&mut self, record: &[u8]) -> Result<u64, Self::Error>;
886}
887
888/// Minimal consumer-side contract for a single globally ordered queue.
889///
890/// `poll()` yields records in source order. `commit_through(ext_seq)` advances
891/// the durable consumed boundary only after downstream committed durability has
892/// been established.
893#[derive(Debug, Clone, PartialEq, Eq)]
894pub struct QueueRecord<T> {
895    /// Queue-level source sequence.
896    pub ext_seq: u64,
897    /// Source-provided reference time in microseconds.
898    pub ref_ext_time_us: u64,
899    /// Opaque record payload.
900    pub record: T,
901}
902
903/// Consumer-side contract for a single globally ordered queue.
904pub trait QueueConsumer {
905    /// Opaque record payload type.
906    type Record;
907    /// Queue error type.
908    type Error;
909
910    /// Polls the next source record if one is available.
911    fn poll(&mut self) -> Result<Option<QueueRecord<Self::Record>>, Self::Error>;
912    /// Advances the durable consumed boundary through `ext_seq`.
913    fn commit_through(&mut self, ext_seq: u64) -> Result<(), Self::Error>;
914}
915
916/// Resume metadata query for queue consumers.
917pub trait QueueConsumerResume {
918    /// Queue error type.
919    type Error;
920
921    /// Returns the next source sequence to consume when known.
922    fn resume_next_ext_seq(&mut self) -> Result<Option<u64>, Self::Error>;
923}
924
925/// Query surface for command outcomes keyed by queue sequence.
926pub trait CommittedResultQuery {
927    /// Query error type.
928    type Error;
929
930    /// Returns the committed status for an accepted source sequence.
931    fn query_committed_by_ext_seq(
932        &mut self,
933        ext_seq: u64,
934    ) -> Result<Option<CommittedStatus>, Self::Error>;
935}
936
937impl<F, E> CommittedResultQuery for F
938where
939    F: FnMut(u64) -> Result<Option<CommittedStatus>, E>,
940{
941    type Error = E;
942
943    fn query_committed_by_ext_seq(
944        &mut self,
945        ext_seq: u64,
946    ) -> Result<Option<CommittedStatus>, Self::Error> {
947        self(ext_seq)
948    }
949}
950
951/// Version byte used by submit request queue records.
952pub const SUBMIT_REQUEST_RECORD_VERSION: u8 = 1;
953/// Fixed header: version(1) + command_kind(1) + payload_len(4).
954const SUBMIT_REQUEST_HEADER_LEN: usize = 6;
955/// Offset to the payload_len field within the submit request header.
956const SUBMIT_REQUEST_PAYLOAD_LEN_OFFSET: usize = 2;
957
958/// Version byte used by committed result queue records.
959pub const COMMITTED_RESULT_RECORD_VERSION: u8 = 2;
960/// Total: version(1) + ext_seq(8) + status_tag(1) + status_value(8) = 18.
961pub const COMMITTED_RESULT_RECORD_LEN: usize = 18;
962const COMMITTED_RESULT_STATUS_TAG_OFFSET: usize = 9;
963const COMMITTED_STATUS_TAG_COMMITTED: u8 = 1;
964const COMMITTED_STATUS_TAG_REJECTED: u8 = 2;
965
966/// Encoded submit request written to ingress queues.
967#[derive(Debug, Clone, PartialEq, Eq)]
968pub struct SubmitRequest {
969    /// Command kind.
970    pub command_kind: u8,
971    /// Encoded command payload.
972    pub payload: Vec<u8>,
973}
974
975impl SubmitRequest {
976    /// Creates a submit request.
977    pub fn new(command_kind: u8, payload: Vec<u8>) -> Self {
978        Self {
979            command_kind,
980            payload,
981        }
982    }
983}
984
985/// Receipt returned when a command is accepted by the queue.
986#[derive(Debug, Clone, Copy, PartialEq, Eq)]
987pub struct QueueReceipt {
988    /// Queue-level acceptance identifier assigned when the command is accepted.
989    pub ext_seq: u64,
990}
991
992/// Receipt returned when a command has a determined final outcome.
993#[derive(Debug, Clone, Copy, PartialEq, Eq)]
994pub struct CommittedReceipt {
995    /// Queue-level acceptance identifier for the command that reached a
996    /// determined final outcome.
997    pub ext_seq: u64,
998    /// Determined final outcome keyed by `ext_seq`.
999    pub status: CommittedStatus,
1000}
1001
1002/// Fixed committed-result record.
1003#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1004pub struct CommittedResultRecord {
1005    /// Queue-level acceptance identifier for the command that reached a
1006    /// determined final outcome.
1007    pub ext_seq: u64,
1008    /// Determined final outcome published/queryable by `ext_seq`.
1009    pub status: CommittedStatus,
1010}
1011
1012/// Domain/runtime rejection category encoded in committed result records.
1013///
1014/// Codes `100..=199` are reserved for runtime execution failures that are
1015/// still reported as determined command outcomes.
1016#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1017#[repr(u16)]
1018pub enum RejectedErrorCode {
1019    /// Domain plugin rejected the command deterministically.
1020    CommandRejected = 1,
1021    /// Runtime trapped a panic while executing the command.
1022    RuntimePanic = 103,
1023}
1024
1025impl RejectedErrorCode {
1026    /// Encodes the error code as `u16`.
1027    pub fn to_u16(self) -> u16 {
1028        self as u16
1029    }
1030
1031    /// Decodes the error code from `u16`.
1032    pub fn from_u16(value: u16) -> Option<Self> {
1033        match value {
1034            1 => Some(Self::CommandRejected),
1035            103 => Some(Self::RuntimePanic),
1036            _ => None,
1037        }
1038    }
1039}
1040
1041/// Determined command outcome.
1042#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1043pub enum CommittedStatus {
1044    /// Determined final outcome with state delta and allocated `tx_seq`.
1045    Committed { tx_seq: u64 },
1046    /// Determined final outcome without state delta but still tied to the
1047    /// command's accepted `ext_seq`.
1048    Rejected { error_code: RejectedErrorCode },
1049}
1050
1051/// Error returned when queue codec records cannot be encoded or decoded.
1052#[derive(Debug, Clone, PartialEq, Eq)]
1053pub enum QueueCodecError {
1054    /// Input ended before all required fields were present.
1055    Truncated,
1056    /// Input had extra bytes after the expected record length.
1057    TrailingBytes { expected: usize, actual: usize },
1058    /// A field length does not fit the codec.
1059    FieldTooLarge { field: &'static str, len: u64 },
1060    /// Record version does not match this codec.
1061    UnsupportedVersion { expected: u8, found: u8 },
1062    /// Field value is not in the valid domain.
1063    InvalidFieldValue { field: &'static str, value: u64 },
1064}
1065
1066impl std::fmt::Display for QueueCodecError {
1067    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1068        match self {
1069            Self::Truncated => write!(f, "truncated queue record"),
1070            Self::TrailingBytes { expected, actual } => {
1071                write!(
1072                    f,
1073                    "queue record has trailing bytes: expected {expected}, actual {actual}"
1074                )
1075            }
1076            Self::FieldTooLarge { field, len } => {
1077                write!(f, "queue record field '{field}' too large: {len} bytes")
1078            }
1079            Self::UnsupportedVersion { expected, found } => {
1080                write!(
1081                    f,
1082                    "unsupported queue record version: expected {expected}, found {found}"
1083                )
1084            }
1085            Self::InvalidFieldValue { field, value } => {
1086                write!(f, "invalid value for queue record field '{field}': {value}")
1087            }
1088        }
1089    }
1090}
1091
1092impl std::error::Error for QueueCodecError {}
1093
1094/// Encodes a submit request queue record.
1095pub fn encode_submit_request(request: &SubmitRequest) -> Result<Vec<u8>, QueueCodecError> {
1096    let payload_len =
1097        u32::try_from(request.payload.len()).map_err(|_| QueueCodecError::FieldTooLarge {
1098            field: "payload",
1099            len: request.payload.len() as u64,
1100        })?;
1101    let mut out = Vec::with_capacity(SUBMIT_REQUEST_HEADER_LEN + request.payload.len());
1102    out.push(SUBMIT_REQUEST_RECORD_VERSION);
1103    out.push(request.command_kind);
1104    out.extend_from_slice(&payload_len.to_le_bytes());
1105    out.extend_from_slice(&request.payload);
1106    Ok(out)
1107}
1108
1109/// Decodes a submit request queue record.
1110pub fn decode_submit_request(bytes: &[u8]) -> Result<SubmitRequest, QueueCodecError> {
1111    if bytes.len() < SUBMIT_REQUEST_HEADER_LEN {
1112        return Err(QueueCodecError::Truncated);
1113    }
1114    let version = bytes[0];
1115    if version != SUBMIT_REQUEST_RECORD_VERSION {
1116        return Err(QueueCodecError::UnsupportedVersion {
1117            expected: SUBMIT_REQUEST_RECORD_VERSION,
1118            found: version,
1119        });
1120    }
1121    let command_kind = bytes[1];
1122    let payload_len = u32::from_le_bytes(
1123        bytes[SUBMIT_REQUEST_PAYLOAD_LEN_OFFSET..SUBMIT_REQUEST_PAYLOAD_LEN_OFFSET + 4]
1124            .try_into()
1125            .map_err(|_| QueueCodecError::Truncated)?,
1126    ) as usize;
1127    let payload_off = SUBMIT_REQUEST_HEADER_LEN;
1128    let expected = payload_off + payload_len;
1129    if bytes.len() < expected {
1130        return Err(QueueCodecError::Truncated);
1131    }
1132    if bytes.len() != expected {
1133        return Err(QueueCodecError::TrailingBytes {
1134            expected,
1135            actual: bytes.len(),
1136        });
1137    }
1138    Ok(SubmitRequest {
1139        command_kind,
1140        payload: bytes[payload_off..expected].to_vec(),
1141    })
1142}
1143
1144/// Encodes a committed-result record into its fixed-width representation.
1145pub fn encode_committed_result_record_fixed(
1146    record: CommittedResultRecord,
1147) -> [u8; COMMITTED_RESULT_RECORD_LEN] {
1148    let mut out = [0u8; COMMITTED_RESULT_RECORD_LEN];
1149    out[0] = COMMITTED_RESULT_RECORD_VERSION;
1150    out[1..9].copy_from_slice(&record.ext_seq.to_le_bytes());
1151    match record.status {
1152        CommittedStatus::Committed { tx_seq } => {
1153            out[COMMITTED_RESULT_STATUS_TAG_OFFSET] = COMMITTED_STATUS_TAG_COMMITTED;
1154            out[10..18].copy_from_slice(&tx_seq.to_le_bytes());
1155        }
1156        CommittedStatus::Rejected { error_code } => {
1157            out[COMMITTED_RESULT_STATUS_TAG_OFFSET] = COMMITTED_STATUS_TAG_REJECTED;
1158            out[10..18].copy_from_slice(&(error_code.to_u16() as u64).to_le_bytes());
1159        }
1160    }
1161    out
1162}
1163
1164/// Encodes a committed-result record.
1165pub fn encode_committed_result_record(record: CommittedResultRecord) -> Vec<u8> {
1166    encode_committed_result_record_fixed(record).to_vec()
1167}
1168
1169/// Decodes a committed-result record.
1170pub fn decode_committed_result_record(
1171    bytes: &[u8],
1172) -> Result<CommittedResultRecord, QueueCodecError> {
1173    if bytes.len() < COMMITTED_RESULT_RECORD_LEN {
1174        return Err(QueueCodecError::Truncated);
1175    }
1176    let version = bytes[0];
1177    if version != COMMITTED_RESULT_RECORD_VERSION {
1178        return Err(QueueCodecError::UnsupportedVersion {
1179            expected: COMMITTED_RESULT_RECORD_VERSION,
1180            found: version,
1181        });
1182    }
1183    if bytes.len() != COMMITTED_RESULT_RECORD_LEN {
1184        return Err(QueueCodecError::TrailingBytes {
1185            expected: COMMITTED_RESULT_RECORD_LEN,
1186            actual: bytes.len(),
1187        });
1188    }
1189    let status_tag = bytes[COMMITTED_RESULT_STATUS_TAG_OFFSET];
1190    let status_value = u64::from_le_bytes(
1191        bytes[10..18]
1192            .try_into()
1193            .map_err(|_| QueueCodecError::Truncated)?,
1194    );
1195    let status = match status_tag {
1196        COMMITTED_STATUS_TAG_COMMITTED => CommittedStatus::Committed {
1197            tx_seq: status_value,
1198        },
1199        COMMITTED_STATUS_TAG_REJECTED => {
1200            let code_u16 =
1201                u16::try_from(status_value).map_err(|_| QueueCodecError::InvalidFieldValue {
1202                    field: "error_code",
1203                    value: status_value,
1204                })?;
1205            let error_code = RejectedErrorCode::from_u16(code_u16).ok_or(
1206                QueueCodecError::InvalidFieldValue {
1207                    field: "error_code",
1208                    value: status_value,
1209                },
1210            )?;
1211            CommittedStatus::Rejected { error_code }
1212        }
1213        other => {
1214            return Err(QueueCodecError::InvalidFieldValue {
1215                field: "status_tag",
1216                value: other as u64,
1217            });
1218        }
1219    };
1220    Ok(CommittedResultRecord {
1221        ext_seq: u64::from_le_bytes(
1222            bytes[1..9]
1223                .try_into()
1224                .map_err(|_| QueueCodecError::Truncated)?,
1225        ),
1226        status,
1227    })
1228}