1pub use statevec_model::command::Command;
12use statevec_model::event::GeneratedEventAccess;
13pub use statevec_model::record::RecordKey;
14use statevec_model::record::{GeneratedRecordAccess, RecordKind, SysId};
15use statevec_model::{CommandDefinition, SchemaRegistry};
16
17mod plugin_abi_v1;
18mod throughput_probe;
19pub use plugin_abi_v1::{
20 ExportedRuntimePluginV1Handle, RUNTIME_PLUGIN_ABI_VERSION_V1, RUNTIME_PLUGIN_ENTRY_V1_SYMBOL,
21 RuntimeBytesMutRef, RuntimeBytesMutVisitor, RuntimeBytesRef, RuntimeBytesVisitor,
22 RuntimeCallStatus, RuntimeCommandView, RuntimeErrorBuf, RuntimeErrorKind, RuntimeErrorPhase,
23 RuntimeHostContextV1, RuntimeHostContextV1Adapter, RuntimeHostVTableV1, RuntimePluginApiV1,
24 RuntimePluginEntryV1, RuntimeReadContextV1, RuntimeReadContextV1Adapter, RuntimeReadVTableV1,
25 RuntimeRecordKeyView, RuntimeRecordKeyVisitor, clear_runtime_error, runtime_bytes_slice,
26 runtime_bytes_slice_mut, runtime_error_kind, runtime_error_message, runtime_error_text,
27 runtime_plugin_create_runtime_v1, runtime_plugin_destroy_runtime_v1, runtime_plugin_name_v1,
28 runtime_plugin_on_unload_v1, runtime_plugin_run_tx_v1, runtime_plugin_schema_bytes_v1,
29 runtime_plugin_validate_biz_invariants_v1, write_runtime_error,
30};
31pub use throughput_probe::RuntimeApiProbe;
32use throughput_probe::{
33 on_runtime_host_update_typed_by_pk, on_runtime_host_with_read_typed_by_pk,
34 on_typed_tx_update_or_create_typed_by_pk, on_typed_tx_update_typed_by_pk,
35 on_typed_tx_with_read_typed_by_pk,
36};
37
38pub const STATEVEC_API_VERSION: &str = "1";
43pub const STATEVEC_API_COMPAT_VERSION: u32 = RUNTIME_PLUGIN_ABI_VERSION_V1;
45
46#[cfg(test)]
47mod ut_api_compat_version {
48 #[test]
49 fn api_compat_version_is_not_the_crate_package_version() {
50 assert_eq!(super::STATEVEC_API_VERSION, "1");
51 assert_eq!(
52 super::STATEVEC_API_COMPAT_VERSION,
53 super::RUNTIME_PLUGIN_ABI_VERSION_V1
54 );
55 assert_ne!(super::STATEVEC_API_VERSION, env!("CARGO_PKG_VERSION"));
56 }
57}
58
59#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct RuntimeHostError {
62 pub message: String,
64}
65
66impl RuntimeHostError {
67 pub fn new(message: impl Into<String>) -> Self {
69 Self {
70 message: message.into(),
71 }
72 }
73}
74
75impl std::fmt::Display for RuntimeHostError {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.write_str(&self.message)
78 }
79}
80
81impl std::error::Error for RuntimeHostError {}
82
83#[derive(Debug, Clone, PartialEq, Eq)]
85pub struct RuntimePluginLoadError {
86 pub message: String,
88}
89
90impl RuntimePluginLoadError {
91 pub fn new(message: impl Into<String>) -> Self {
93 Self {
94 message: message.into(),
95 }
96 }
97}
98
99impl std::fmt::Display for RuntimePluginLoadError {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 f.write_str(&self.message)
102 }
103}
104
105impl std::error::Error for RuntimePluginLoadError {}
106
107#[derive(Debug, Clone, PartialEq, Eq)]
109pub struct RuntimePluginError {
110 pub message: String,
112}
113
114impl RuntimePluginError {
115 pub fn new(message: impl Into<String>) -> Self {
117 Self {
118 message: message.into(),
119 }
120 }
121}
122
123impl std::fmt::Display for RuntimePluginError {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 f.write_str(&self.message)
126 }
127}
128
129impl std::error::Error for RuntimePluginError {}
130
131#[derive(Debug, Clone, PartialEq, Eq)]
133pub struct RuntimePluginUnloadError {
134 pub message: String,
136}
137
138impl RuntimePluginUnloadError {
139 pub fn new(message: impl Into<String>) -> Self {
141 Self {
142 message: message.into(),
143 }
144 }
145}
146
147impl std::fmt::Display for RuntimePluginUnloadError {
148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 f.write_str(&self.message)
150 }
151}
152
153impl std::error::Error for RuntimePluginUnloadError {}
154
155pub trait RuntimeHostContext {
161 fn with_read_typed_raw(
163 &self,
164 record_kind: RecordKind,
165 sys_id: SysId,
166 f: &mut dyn FnMut(&[u8]),
167 ) -> Result<bool, RuntimeHostError>;
168
169 fn with_read_typed_by_pk_raw(
171 &self,
172 record_kind: RecordKind,
173 pk: &[u8],
174 f: &mut dyn FnMut(&[u8]),
175 ) -> Result<bool, RuntimeHostError>;
176
177 fn create_typed_raw(
179 &mut self,
180 record_kind: RecordKind,
181 init: &mut dyn FnMut(&mut [u8]),
182 ) -> Result<RecordKey, RuntimeHostError>;
183
184 fn update_typed_by_pk_raw(
186 &mut self,
187 record_kind: RecordKind,
188 pk: &[u8],
189 f: &mut dyn FnMut(&mut [u8]),
190 ) -> Result<bool, RuntimeHostError>;
191
192 fn delete_by_pk_raw(
194 &mut self,
195 record_kind: RecordKind,
196 pk: &[u8],
197 ) -> Result<bool, RuntimeHostError>;
198
199 fn emit_typed_event_raw(
201 &mut self,
202 event_kind: u8,
203 payload: &[u8],
204 ) -> Result<(), RuntimeHostError>;
205
206 fn for_each_record_key_raw(
208 &self,
209 kind: RecordKind,
210 f: &mut dyn FnMut(RecordKey),
211 ) -> Result<(), RuntimeHostError>;
212
213 fn debug_log(&mut self, _message: String) -> Result<(), RuntimeHostError> {
215 Ok(())
216 }
217}
218
219pub trait RuntimeHostContextExt: RuntimeHostContext {
226 fn with_read_typed<R, T, F>(&self, sys_id: SysId, f: F) -> Result<Option<T>, RuntimeHostError>
228 where
229 R: GeneratedRecordAccess,
230 F: FnOnce(R::Access<'_>) -> T,
231 {
232 let mut f = Some(f);
233 let mut out = None;
234 let found = self.with_read_typed_raw(R::KIND, sys_id, &mut |data| {
235 let apply = f.take().expect("callback invoked more than once");
236 out = Some(apply(R::wrap(data)));
237 })?;
238 Ok(found.then_some(out).flatten())
239 }
240
241 fn with_read_typed_by_pk<R, P, T, F>(&self, pk: P, f: F) -> Result<Option<T>, RuntimeHostError>
247 where
248 R: GeneratedRecordAccess,
249 P: AsRef<[u8]>,
250 F: FnOnce(R::Access<'_>) -> T,
251 {
252 on_runtime_host_with_read_typed_by_pk();
253 let mut f = Some(f);
254 let mut out = None;
255 let found = self.with_read_typed_by_pk_raw(R::KIND, pk.as_ref(), &mut |data| {
256 let apply = f.take().expect("callback invoked more than once");
257 out = Some(apply(R::wrap(data)));
258 })?;
259 Ok(found.then_some(out).flatten())
260 }
261
262 fn create_typed<R, F>(&mut self, init: F) -> Result<RecordKey, RuntimeHostError>
264 where
265 R: GeneratedRecordAccess,
266 F: for<'b> FnOnce(&mut R::NewBuilder<'b>),
267 {
268 let mut init = Some(init);
269 let key = self.create_typed_raw(R::KIND, &mut |buf| {
270 let apply = init.take().expect("init callback invoked more than once");
271 let mut builder = R::wrap_new(buf);
272 apply(&mut builder);
273 })?;
274 Ok(key)
275 }
276
277 fn update_typed_by_pk<R, P, T, F>(&mut self, pk: P, f: F) -> Result<Option<T>, RuntimeHostError>
283 where
284 R: GeneratedRecordAccess,
285 P: AsRef<[u8]>,
286 F: for<'b> FnOnce(&mut R::UpdateBuilder<'b>) -> T,
287 {
288 on_runtime_host_update_typed_by_pk();
289 let mut f = Some(f);
290 let mut out = None;
291 let found = self.update_typed_by_pk_raw(R::KIND, pk.as_ref(), &mut |buf| {
292 let apply = f.take().expect("update callback invoked more than once");
293 let mut builder = R::wrap_update(buf);
294 out = Some(apply(&mut builder));
295 })?;
296 Ok(found.then_some(out).flatten())
297 }
298
299 fn update_or_create_typed_by_pk<R, P, T, FU, FC>(
301 &mut self,
302 pk: P,
303 update: FU,
304 create: FC,
305 ) -> Result<T, RuntimeHostError>
306 where
307 R: GeneratedRecordAccess,
308 P: AsRef<[u8]>,
309 FU: for<'b> FnOnce(&mut R::UpdateBuilder<'b>) -> T,
310 FC: for<'b> FnOnce(&mut R::NewBuilder<'b>) -> T,
311 {
312 if let Some(value) = self.update_typed_by_pk::<R, P, T, FU>(pk, update)? {
313 return Ok(value);
314 }
315
316 let mut out = None;
317 self.create_typed::<R, _>(|builder| {
318 out = Some(create(builder));
319 })?;
320 Ok(out.expect("create closure must produce a value"))
321 }
322
323 fn delete_by_pk<R, P>(&mut self, pk: P) -> Result<bool, RuntimeHostError>
325 where
326 R: GeneratedRecordAccess,
327 P: AsRef<[u8]>,
328 {
329 self.delete_by_pk_raw(R::KIND, pk.as_ref())
330 }
331
332 fn emit_typed_event<E>(&mut self, payload: Vec<u8>) -> Result<(), RuntimeHostError>
334 where
335 E: GeneratedEventAccess,
336 {
337 self.emit_typed_event_raw(E::KIND, &payload)
338 }
339
340 fn for_each_record_key(
342 &self,
343 kind: RecordKind,
344 f: &mut dyn FnMut(RecordKey),
345 ) -> Result<(), RuntimeHostError> {
346 self.for_each_record_key_raw(kind, &mut |key| f(key))
347 }
348}
349
350impl<T: RuntimeHostContext + ?Sized> RuntimeHostContextExt for T {}
351
352pub trait RuntimePluginFactory {
354 fn plugin_name(&self) -> &'static str;
356
357 fn schema_registry(&self) -> SchemaRegistry;
359
360 fn command_definitions(&self) -> &'static [&'static CommandDefinition] {
362 &[]
363 }
364
365 fn create(
367 &self,
368 plugin_config_text: &str,
369 ) -> Result<Box<dyn RuntimePlugin>, RuntimePluginLoadError>;
370}
371
372pub trait BizInvariantReadContext {
378 fn with_read_typed_raw(
380 &self,
381 record_kind: RecordKind,
382 sys_id: SysId,
383 f: &mut dyn FnMut(&[u8]),
384 ) -> Result<bool, RuntimeHostError>;
385
386 fn with_read_typed_by_pk_raw(
388 &self,
389 record_kind: RecordKind,
390 pk: &[u8],
391 f: &mut dyn FnMut(&[u8]),
392 ) -> Result<bool, RuntimeHostError>;
393
394 fn for_each_record_key_raw(
396 &self,
397 kind: RecordKind,
398 f: &mut dyn FnMut(RecordKey),
399 ) -> Result<(), RuntimeHostError>;
400}
401
402pub trait InvariantReadContextExt: BizInvariantReadContext {
404 fn with_read_typed<R, T, F>(&self, sys_id: SysId, f: F) -> Result<Option<T>, RuntimeHostError>
406 where
407 R: GeneratedRecordAccess,
408 F: FnOnce(R::Access<'_>) -> T,
409 {
410 let mut result = None;
411 let mut f = Some(f);
412 let found = self.with_read_typed_raw(R::KIND, sys_id, &mut |data| {
413 if let Some(f) = f.take() {
414 result = Some(f(R::wrap(data)));
415 }
416 })?;
417 if found { Ok(result) } else { Ok(None) }
418 }
419
420 fn with_read_typed_by_pk<R, P, T, F>(&self, pk: P, f: F) -> Result<Option<T>, RuntimeHostError>
422 where
423 R: GeneratedRecordAccess,
424 P: AsRef<[u8]>,
425 F: FnOnce(R::Access<'_>) -> T,
426 {
427 let mut result = None;
428 let mut f = Some(f);
429 let found = self.with_read_typed_by_pk_raw(R::KIND, pk.as_ref(), &mut |data| {
430 if let Some(f) = f.take() {
431 result = Some(f(R::wrap(data)));
432 }
433 })?;
434 if found { Ok(result) } else { Ok(None) }
435 }
436}
437
438impl<T: BizInvariantReadContext + ?Sized> InvariantReadContextExt for T {}
439
440pub trait RuntimePlugin {
442 fn name(&self) -> &'static str;
444
445 fn schema_registry(&self) -> SchemaRegistry;
447
448 fn command_definitions(&self) -> &'static [&'static CommandDefinition] {
450 &[]
451 }
452
453 fn run_tx(
455 &self,
456 tx: &mut dyn RuntimeHostContext,
457 command: &dyn RuntimeCommandEnvelope,
458 ) -> Result<(), RuntimePluginError>;
459
460 fn validate_biz_invariants(&self, _ctx: &dyn BizInvariantReadContext) -> Result<(), String> {
469 Ok(())
470 }
471
472 fn on_unload(&mut self) -> Result<(), RuntimePluginUnloadError> {
474 Ok(())
475 }
476}
477
478pub trait RuntimeCommandEnvelope {
480 fn command_kind(&self) -> u8;
482 fn ext_seq(&self) -> u64;
484 fn ref_ext_time_us(&self) -> u64;
486 fn payload(&self) -> &[u8];
488}
489
490#[derive(Debug, Clone, Copy)]
492pub struct RuntimeCommandRef<'a> {
493 command_kind: u8,
494 ext_seq: u64,
495 ref_ext_time_us: u64,
496 payload: &'a [u8],
497}
498
499impl<'a> RuntimeCommandRef<'a> {
500 #[inline]
502 pub fn new(command_kind: u8, ext_seq: u64, ref_ext_time_us: u64, payload: &'a [u8]) -> Self {
503 Self {
504 command_kind,
505 ext_seq,
506 ref_ext_time_us,
507 payload,
508 }
509 }
510}
511
512impl RuntimeCommandEnvelope for RuntimeCommandRef<'_> {
513 #[inline(always)]
514 fn command_kind(&self) -> u8 {
515 self.command_kind
516 }
517
518 #[inline(always)]
519 fn ext_seq(&self) -> u64 {
520 self.ext_seq
521 }
522
523 #[inline(always)]
524 fn ref_ext_time_us(&self) -> u64 {
525 self.ref_ext_time_us
526 }
527
528 #[inline(always)]
529 fn payload(&self) -> &[u8] {
530 self.payload
531 }
532}
533
534impl RuntimeCommandEnvelope for Command {
535 #[inline(always)]
536 fn command_kind(&self) -> u8 {
537 self.command_kind()
538 }
539
540 #[inline(always)]
541 fn ext_seq(&self) -> u64 {
542 self.ext_seq()
543 }
544
545 #[inline(always)]
546 fn ref_ext_time_us(&self) -> u64 {
547 self.ref_ext_time_us()
548 }
549
550 #[inline(always)]
551 fn payload(&self) -> &[u8] {
552 self.payload()
553 }
554}
555
556pub trait TxReadContext {
558 type Error;
560
561 fn with_read_raw<T>(
563 &self,
564 key: RecordKey,
565 f: impl FnOnce(&[u8]) -> T,
566 ) -> Result<Option<T>, Self::Error>;
567 fn for_each_record_key(&self, kind: RecordKind, f: &mut dyn FnMut(RecordKey));
569}
570
571pub trait TxPkContext: TxReadContext {
573 fn resolve_pk(&self, kind: RecordKind, pk: &[u8]) -> Result<Option<SysId>, Self::Error>;
575}
576
577pub trait TxWriteContext: TxReadContext {
579 fn create_raw(&mut self, kind: RecordKind, data: Vec<u8>) -> Result<RecordKey, Self::Error>;
581 fn update_raw<T>(
583 &mut self,
584 key: RecordKey,
585 f: impl FnOnce(&mut [u8]) -> T,
586 ) -> Result<Option<T>, Self::Error>;
587 fn delete_raw(&mut self, key: RecordKey) -> Result<bool, Self::Error>;
589 fn emit_event_raw(&mut self, event_kind: u8, payload: Vec<u8>);
591 fn debug_log(&mut self, _message: String) {}
593}
594
595pub trait TxSysIdCreateContext: TxWriteContext {
597 fn create_with_sys_id_raw(
599 &mut self,
600 kind: RecordKind,
601 sys_id: SysId,
602 data: Vec<u8>,
603 ) -> Result<RecordKey, Self::Error>;
604}
605
606pub trait TxContext: TxPkContext + TxSysIdCreateContext {}
608
609impl<T: TxPkContext + TxSysIdCreateContext + ?Sized> TxContext for T {}
610
611pub trait TypedTxContext {
613 type Error;
615
616 fn with_read_typed<R, T, F>(&self, sys_id: SysId, f: F) -> Result<Option<T>, Self::Error>
618 where
619 R: GeneratedRecordAccess,
620 F: FnOnce(R::Access<'_>) -> T;
621
622 fn with_read_typed_by_pk<R, P, T, F>(&self, pk: P, f: F) -> Result<Option<T>, Self::Error>
629 where
630 R: GeneratedRecordAccess,
631 P: AsRef<[u8]>,
632 F: FnOnce(R::Access<'_>) -> T;
633
634 fn create_typed<R, F>(&mut self, init: F) -> Result<RecordKey, Self::Error>
636 where
637 R: GeneratedRecordAccess,
638 F: for<'b> FnOnce(&mut R::NewBuilder<'b>);
639
640 fn update_typed_by_pk<R, P, T, F>(&mut self, pk: P, f: F) -> Result<Option<T>, Self::Error>
646 where
647 R: GeneratedRecordAccess,
648 P: AsRef<[u8]>,
649 F: for<'b> FnOnce(&mut R::UpdateBuilder<'b>) -> T;
650
651 fn update_or_create_typed_by_pk<R, P, T, FU, FC>(
653 &mut self,
654 pk: P,
655 update: FU,
656 create: FC,
657 ) -> Result<T, Self::Error>
658 where
659 R: GeneratedRecordAccess,
660 P: AsRef<[u8]>,
661 FU: for<'b> FnOnce(&mut R::UpdateBuilder<'b>) -> T,
662 FC: for<'b> FnOnce(&mut R::NewBuilder<'b>) -> T,
663 {
664 if let Some(value) = self.update_typed_by_pk::<R, P, T, FU>(pk, update)? {
665 return Ok(value);
666 }
667 let mut out = None;
668 self.create_typed::<R, _>(|builder| {
669 out = Some(create(builder));
670 })?;
671 Ok(out.expect("create closure must produce a value"))
672 }
673
674 fn delete_by_pk<R, P>(&mut self, pk: P) -> Result<bool, Self::Error>
676 where
677 R: GeneratedRecordAccess,
678 P: AsRef<[u8]>;
679
680 fn emit_typed_event<E>(&mut self, payload: Vec<u8>)
682 where
683 E: GeneratedEventAccess;
684
685 fn for_each_record_key(&self, kind: RecordKind, f: &mut dyn FnMut(RecordKey));
687
688 fn debug_log(&mut self, _message: String) {}
690}
691
692impl<Ctx: TxContext + ?Sized> TypedTxContext for Ctx {
693 type Error = <Ctx as TxReadContext>::Error;
694
695 fn with_read_typed<R, T, F>(&self, sys_id: SysId, f: F) -> Result<Option<T>, Self::Error>
696 where
697 R: GeneratedRecordAccess,
698 F: FnOnce(R::Access<'_>) -> T,
699 {
700 TxReadContext::with_read_raw(
701 self,
702 RecordKey {
703 kind: R::KIND,
704 sys_id,
705 },
706 |data| f(R::wrap(data)),
707 )
708 }
709
710 fn with_read_typed_by_pk<R, P, T, F>(&self, pk: P, f: F) -> Result<Option<T>, Self::Error>
711 where
712 R: GeneratedRecordAccess,
713 P: AsRef<[u8]>,
714 F: FnOnce(R::Access<'_>) -> T,
715 {
716 on_typed_tx_with_read_typed_by_pk();
717 let Some(sys_id) = TxPkContext::resolve_pk(self, R::KIND, pk.as_ref())? else {
718 return Ok(None);
719 };
720 TypedTxContext::with_read_typed::<R, T, F>(self, sys_id, f)
721 }
722
723 fn create_typed<R, F>(&mut self, init: F) -> Result<RecordKey, Self::Error>
724 where
725 R: GeneratedRecordAccess,
726 F: for<'b> FnOnce(&mut R::NewBuilder<'b>),
727 {
728 let mut data = vec![0u8; R::DATA_LEN];
729 init(&mut R::wrap_new(&mut data));
730 TxWriteContext::create_raw(self, R::KIND, data)
731 }
732
733 fn update_typed_by_pk<R, P, T, F>(&mut self, pk: P, f: F) -> Result<Option<T>, Self::Error>
734 where
735 R: GeneratedRecordAccess,
736 P: AsRef<[u8]>,
737 F: for<'b> FnOnce(&mut R::UpdateBuilder<'b>) -> T,
738 {
739 on_typed_tx_update_typed_by_pk();
740 let Some(sys_id) = TxPkContext::resolve_pk(self, R::KIND, pk.as_ref())? else {
741 return Ok(None);
742 };
743 TxWriteContext::update_raw(
744 self,
745 RecordKey {
746 kind: R::KIND,
747 sys_id,
748 },
749 |data| {
750 let mut builder = R::wrap_update(data);
751 f(&mut builder)
752 },
753 )
754 }
755
756 fn update_or_create_typed_by_pk<R, P, T, FU, FC>(
757 &mut self,
758 pk: P,
759 update: FU,
760 create: FC,
761 ) -> Result<T, Self::Error>
762 where
763 R: GeneratedRecordAccess,
764 P: AsRef<[u8]>,
765 FU: for<'b> FnOnce(&mut R::UpdateBuilder<'b>) -> T,
766 FC: for<'b> FnOnce(&mut R::NewBuilder<'b>) -> T,
767 {
768 on_typed_tx_update_or_create_typed_by_pk();
769 if let Some(value) = self.update_typed_by_pk::<R, P, T, FU>(pk, update)? {
770 return Ok(value);
771 }
772
773 let mut out = None;
774 self.create_typed::<R, _>(|builder| {
775 out = Some(create(builder));
776 })?;
777 Ok(out.expect("create closure must produce a value"))
778 }
779
780 fn delete_by_pk<R, P>(&mut self, pk: P) -> Result<bool, Self::Error>
781 where
782 R: GeneratedRecordAccess,
783 P: AsRef<[u8]>,
784 {
785 let Some(sys_id) = TxPkContext::resolve_pk(self, R::KIND, pk.as_ref())? else {
786 return Ok(false);
787 };
788 TxWriteContext::delete_raw(
789 self,
790 RecordKey {
791 kind: R::KIND,
792 sys_id,
793 },
794 )
795 }
796
797 fn emit_typed_event<E>(&mut self, payload: Vec<u8>)
798 where
799 E: GeneratedEventAccess,
800 {
801 TxWriteContext::emit_event_raw(self, E::KIND, payload);
802 }
803
804 fn for_each_record_key(&self, kind: RecordKind, f: &mut dyn FnMut(RecordKey)) {
805 TxReadContext::for_each_record_key(self, kind, f);
806 }
807
808 fn debug_log(&mut self, message: String) {
809 TxWriteContext::debug_log(self, message);
810 }
811}
812
813impl TypedTxContext for dyn RuntimeHostContext + '_ {
817 type Error = RuntimeHostError;
818
819 fn with_read_typed<R, T, F>(&self, sys_id: SysId, f: F) -> Result<Option<T>, Self::Error>
820 where
821 R: GeneratedRecordAccess,
822 F: FnOnce(R::Access<'_>) -> T,
823 {
824 RuntimeHostContextExt::with_read_typed::<R, T, F>(self, sys_id, f)
825 }
826
827 fn with_read_typed_by_pk<R, P, T, F>(&self, pk: P, f: F) -> Result<Option<T>, Self::Error>
828 where
829 R: GeneratedRecordAccess,
830 P: AsRef<[u8]>,
831 F: FnOnce(R::Access<'_>) -> T,
832 {
833 RuntimeHostContextExt::with_read_typed_by_pk::<R, P, T, F>(self, pk, f)
834 }
835
836 fn create_typed<R, F>(&mut self, init: F) -> Result<RecordKey, Self::Error>
837 where
838 R: GeneratedRecordAccess,
839 F: for<'b> FnOnce(&mut R::NewBuilder<'b>),
840 {
841 RuntimeHostContextExt::create_typed::<R, F>(self, init)
842 }
843
844 fn update_typed_by_pk<R, P, T, F>(&mut self, pk: P, f: F) -> Result<Option<T>, Self::Error>
845 where
846 R: GeneratedRecordAccess,
847 P: AsRef<[u8]>,
848 F: for<'b> FnOnce(&mut R::UpdateBuilder<'b>) -> T,
849 {
850 RuntimeHostContextExt::update_typed_by_pk::<R, P, T, F>(self, pk, f)
851 }
852
853 fn delete_by_pk<R, P>(&mut self, pk: P) -> Result<bool, Self::Error>
854 where
855 R: GeneratedRecordAccess,
856 P: AsRef<[u8]>,
857 {
858 RuntimeHostContextExt::delete_by_pk::<R, P>(self, pk)
859 }
860
861 fn emit_typed_event<E>(&mut self, payload: Vec<u8>)
862 where
863 E: GeneratedEventAccess,
864 {
865 RuntimeHostContextExt::emit_typed_event::<E>(self, payload)
866 .expect("host emit_typed_event_raw failed");
867 }
868
869 fn for_each_record_key(&self, kind: RecordKind, f: &mut dyn FnMut(RecordKey)) {
870 let _ = RuntimeHostContextExt::for_each_record_key(self, kind, f);
871 }
872
873 fn debug_log(&mut self, message: String) {
874 let _ = RuntimeHostContext::debug_log(self, message);
875 }
876}
877
878pub trait QueueProducer {
883 type Error;
884
885 fn append(&mut self, record: &[u8]) -> Result<u64, Self::Error>;
886}
887
888#[derive(Debug, Clone, PartialEq, Eq)]
894pub struct QueueRecord<T> {
895 pub ext_seq: u64,
897 pub ref_ext_time_us: u64,
899 pub record: T,
901}
902
903pub trait QueueConsumer {
905 type Record;
907 type Error;
909
910 fn poll(&mut self) -> Result<Option<QueueRecord<Self::Record>>, Self::Error>;
912 fn commit_through(&mut self, ext_seq: u64) -> Result<(), Self::Error>;
914}
915
916pub trait QueueConsumerResume {
918 type Error;
920
921 fn resume_next_ext_seq(&mut self) -> Result<Option<u64>, Self::Error>;
923}
924
925pub trait CommittedResultQuery {
927 type Error;
929
930 fn query_committed_by_ext_seq(
932 &mut self,
933 ext_seq: u64,
934 ) -> Result<Option<CommittedStatus>, Self::Error>;
935}
936
937impl<F, E> CommittedResultQuery for F
938where
939 F: FnMut(u64) -> Result<Option<CommittedStatus>, E>,
940{
941 type Error = E;
942
943 fn query_committed_by_ext_seq(
944 &mut self,
945 ext_seq: u64,
946 ) -> Result<Option<CommittedStatus>, Self::Error> {
947 self(ext_seq)
948 }
949}
950
951pub const SUBMIT_REQUEST_RECORD_VERSION: u8 = 1;
953const SUBMIT_REQUEST_HEADER_LEN: usize = 6;
955const SUBMIT_REQUEST_PAYLOAD_LEN_OFFSET: usize = 2;
957
958pub const COMMITTED_RESULT_RECORD_VERSION: u8 = 2;
960pub const COMMITTED_RESULT_RECORD_LEN: usize = 18;
962const COMMITTED_RESULT_STATUS_TAG_OFFSET: usize = 9;
963const COMMITTED_STATUS_TAG_COMMITTED: u8 = 1;
964const COMMITTED_STATUS_TAG_REJECTED: u8 = 2;
965
966#[derive(Debug, Clone, PartialEq, Eq)]
968pub struct SubmitRequest {
969 pub command_kind: u8,
971 pub payload: Vec<u8>,
973}
974
975impl SubmitRequest {
976 pub fn new(command_kind: u8, payload: Vec<u8>) -> Self {
978 Self {
979 command_kind,
980 payload,
981 }
982 }
983}
984
985#[derive(Debug, Clone, Copy, PartialEq, Eq)]
987pub struct QueueReceipt {
988 pub ext_seq: u64,
990}
991
992#[derive(Debug, Clone, Copy, PartialEq, Eq)]
994pub struct CommittedReceipt {
995 pub ext_seq: u64,
998 pub status: CommittedStatus,
1000}
1001
1002#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1004pub struct CommittedResultRecord {
1005 pub ext_seq: u64,
1008 pub status: CommittedStatus,
1010}
1011
1012#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1017#[repr(u16)]
1018pub enum RejectedErrorCode {
1019 CommandRejected = 1,
1021 RuntimePanic = 103,
1023}
1024
1025impl RejectedErrorCode {
1026 pub fn to_u16(self) -> u16 {
1028 self as u16
1029 }
1030
1031 pub fn from_u16(value: u16) -> Option<Self> {
1033 match value {
1034 1 => Some(Self::CommandRejected),
1035 103 => Some(Self::RuntimePanic),
1036 _ => None,
1037 }
1038 }
1039}
1040
1041#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1043pub enum CommittedStatus {
1044 Committed { tx_seq: u64 },
1046 Rejected { error_code: RejectedErrorCode },
1049}
1050
1051#[derive(Debug, Clone, PartialEq, Eq)]
1053pub enum QueueCodecError {
1054 Truncated,
1056 TrailingBytes { expected: usize, actual: usize },
1058 FieldTooLarge { field: &'static str, len: u64 },
1060 UnsupportedVersion { expected: u8, found: u8 },
1062 InvalidFieldValue { field: &'static str, value: u64 },
1064}
1065
1066impl std::fmt::Display for QueueCodecError {
1067 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1068 match self {
1069 Self::Truncated => write!(f, "truncated queue record"),
1070 Self::TrailingBytes { expected, actual } => {
1071 write!(
1072 f,
1073 "queue record has trailing bytes: expected {expected}, actual {actual}"
1074 )
1075 }
1076 Self::FieldTooLarge { field, len } => {
1077 write!(f, "queue record field '{field}' too large: {len} bytes")
1078 }
1079 Self::UnsupportedVersion { expected, found } => {
1080 write!(
1081 f,
1082 "unsupported queue record version: expected {expected}, found {found}"
1083 )
1084 }
1085 Self::InvalidFieldValue { field, value } => {
1086 write!(f, "invalid value for queue record field '{field}': {value}")
1087 }
1088 }
1089 }
1090}
1091
1092impl std::error::Error for QueueCodecError {}
1093
1094pub fn encode_submit_request(request: &SubmitRequest) -> Result<Vec<u8>, QueueCodecError> {
1096 let payload_len =
1097 u32::try_from(request.payload.len()).map_err(|_| QueueCodecError::FieldTooLarge {
1098 field: "payload",
1099 len: request.payload.len() as u64,
1100 })?;
1101 let mut out = Vec::with_capacity(SUBMIT_REQUEST_HEADER_LEN + request.payload.len());
1102 out.push(SUBMIT_REQUEST_RECORD_VERSION);
1103 out.push(request.command_kind);
1104 out.extend_from_slice(&payload_len.to_le_bytes());
1105 out.extend_from_slice(&request.payload);
1106 Ok(out)
1107}
1108
1109pub fn decode_submit_request(bytes: &[u8]) -> Result<SubmitRequest, QueueCodecError> {
1111 if bytes.len() < SUBMIT_REQUEST_HEADER_LEN {
1112 return Err(QueueCodecError::Truncated);
1113 }
1114 let version = bytes[0];
1115 if version != SUBMIT_REQUEST_RECORD_VERSION {
1116 return Err(QueueCodecError::UnsupportedVersion {
1117 expected: SUBMIT_REQUEST_RECORD_VERSION,
1118 found: version,
1119 });
1120 }
1121 let command_kind = bytes[1];
1122 let payload_len = u32::from_le_bytes(
1123 bytes[SUBMIT_REQUEST_PAYLOAD_LEN_OFFSET..SUBMIT_REQUEST_PAYLOAD_LEN_OFFSET + 4]
1124 .try_into()
1125 .map_err(|_| QueueCodecError::Truncated)?,
1126 ) as usize;
1127 let payload_off = SUBMIT_REQUEST_HEADER_LEN;
1128 let expected = payload_off + payload_len;
1129 if bytes.len() < expected {
1130 return Err(QueueCodecError::Truncated);
1131 }
1132 if bytes.len() != expected {
1133 return Err(QueueCodecError::TrailingBytes {
1134 expected,
1135 actual: bytes.len(),
1136 });
1137 }
1138 Ok(SubmitRequest {
1139 command_kind,
1140 payload: bytes[payload_off..expected].to_vec(),
1141 })
1142}
1143
1144pub fn encode_committed_result_record_fixed(
1146 record: CommittedResultRecord,
1147) -> [u8; COMMITTED_RESULT_RECORD_LEN] {
1148 let mut out = [0u8; COMMITTED_RESULT_RECORD_LEN];
1149 out[0] = COMMITTED_RESULT_RECORD_VERSION;
1150 out[1..9].copy_from_slice(&record.ext_seq.to_le_bytes());
1151 match record.status {
1152 CommittedStatus::Committed { tx_seq } => {
1153 out[COMMITTED_RESULT_STATUS_TAG_OFFSET] = COMMITTED_STATUS_TAG_COMMITTED;
1154 out[10..18].copy_from_slice(&tx_seq.to_le_bytes());
1155 }
1156 CommittedStatus::Rejected { error_code } => {
1157 out[COMMITTED_RESULT_STATUS_TAG_OFFSET] = COMMITTED_STATUS_TAG_REJECTED;
1158 out[10..18].copy_from_slice(&(error_code.to_u16() as u64).to_le_bytes());
1159 }
1160 }
1161 out
1162}
1163
1164pub fn encode_committed_result_record(record: CommittedResultRecord) -> Vec<u8> {
1166 encode_committed_result_record_fixed(record).to_vec()
1167}
1168
1169pub fn decode_committed_result_record(
1171 bytes: &[u8],
1172) -> Result<CommittedResultRecord, QueueCodecError> {
1173 if bytes.len() < COMMITTED_RESULT_RECORD_LEN {
1174 return Err(QueueCodecError::Truncated);
1175 }
1176 let version = bytes[0];
1177 if version != COMMITTED_RESULT_RECORD_VERSION {
1178 return Err(QueueCodecError::UnsupportedVersion {
1179 expected: COMMITTED_RESULT_RECORD_VERSION,
1180 found: version,
1181 });
1182 }
1183 if bytes.len() != COMMITTED_RESULT_RECORD_LEN {
1184 return Err(QueueCodecError::TrailingBytes {
1185 expected: COMMITTED_RESULT_RECORD_LEN,
1186 actual: bytes.len(),
1187 });
1188 }
1189 let status_tag = bytes[COMMITTED_RESULT_STATUS_TAG_OFFSET];
1190 let status_value = u64::from_le_bytes(
1191 bytes[10..18]
1192 .try_into()
1193 .map_err(|_| QueueCodecError::Truncated)?,
1194 );
1195 let status = match status_tag {
1196 COMMITTED_STATUS_TAG_COMMITTED => CommittedStatus::Committed {
1197 tx_seq: status_value,
1198 },
1199 COMMITTED_STATUS_TAG_REJECTED => {
1200 let code_u16 =
1201 u16::try_from(status_value).map_err(|_| QueueCodecError::InvalidFieldValue {
1202 field: "error_code",
1203 value: status_value,
1204 })?;
1205 let error_code = RejectedErrorCode::from_u16(code_u16).ok_or(
1206 QueueCodecError::InvalidFieldValue {
1207 field: "error_code",
1208 value: status_value,
1209 },
1210 )?;
1211 CommittedStatus::Rejected { error_code }
1212 }
1213 other => {
1214 return Err(QueueCodecError::InvalidFieldValue {
1215 field: "status_tag",
1216 value: other as u64,
1217 });
1218 }
1219 };
1220 Ok(CommittedResultRecord {
1221 ext_seq: u64::from_le_bytes(
1222 bytes[1..9]
1223 .try_into()
1224 .map_err(|_| QueueCodecError::Truncated)?,
1225 ),
1226 status,
1227 })
1228}