pub use statevec_model::command::Command;
use statevec_model::event::GeneratedEventAccess;
pub use statevec_model::record::RecordKey;
use statevec_model::record::{GeneratedRecordAccess, RecordKind, SysId};
use statevec_model::{CommandDefinition, SchemaRegistry};
mod plugin_abi_v1;
mod throughput_probe;
pub use plugin_abi_v1::{
ExportedRuntimePluginV1Handle, RUNTIME_PLUGIN_ABI_VERSION_V1, RUNTIME_PLUGIN_ENTRY_V1_SYMBOL,
RuntimeBytesMutRef, RuntimeBytesMutVisitor, RuntimeBytesRef, RuntimeBytesVisitor,
RuntimeCallStatus, RuntimeCommandView, RuntimeErrorBuf, RuntimeErrorKind, RuntimeErrorPhase,
RuntimeHostContextV1, RuntimeHostContextV1Adapter, RuntimeHostVTableV1, RuntimePluginApiV1,
RuntimePluginEntryV1, RuntimeReadContextV1, RuntimeReadContextV1Adapter, RuntimeReadVTableV1,
RuntimeRecordKeyView, RuntimeRecordKeyVisitor, clear_runtime_error, runtime_bytes_slice,
runtime_bytes_slice_mut, runtime_error_kind, runtime_error_message, runtime_error_text,
runtime_plugin_create_runtime_v1, runtime_plugin_destroy_runtime_v1, runtime_plugin_name_v1,
runtime_plugin_on_unload_v1, runtime_plugin_run_tx_v1, runtime_plugin_schema_bytes_v1,
runtime_plugin_validate_biz_invariants_v1, write_runtime_error,
};
pub use throughput_probe::RuntimeApiProbe;
use throughput_probe::{
on_runtime_host_update_typed_by_pk, on_runtime_host_with_read_typed_by_pk,
on_typed_tx_update_or_create_typed_by_pk, on_typed_tx_update_typed_by_pk,
on_typed_tx_with_read_typed_by_pk,
};
pub const STATEVEC_API_VERSION: &str = "1";
pub const STATEVEC_API_COMPAT_VERSION: u32 = RUNTIME_PLUGIN_ABI_VERSION_V1;
#[cfg(test)]
mod ut_api_compat_version {
#[test]
fn api_compat_version_is_not_the_crate_package_version() {
assert_eq!(super::STATEVEC_API_VERSION, "1");
assert_eq!(
super::STATEVEC_API_COMPAT_VERSION,
super::RUNTIME_PLUGIN_ABI_VERSION_V1
);
assert_ne!(super::STATEVEC_API_VERSION, env!("CARGO_PKG_VERSION"));
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RuntimeHostError {
pub message: String,
}
impl RuntimeHostError {
pub fn new(message: impl Into<String>) -> Self {
Self {
message: message.into(),
}
}
}
impl std::fmt::Display for RuntimeHostError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.message)
}
}
impl std::error::Error for RuntimeHostError {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RuntimePluginLoadError {
pub message: String,
}
impl RuntimePluginLoadError {
pub fn new(message: impl Into<String>) -> Self {
Self {
message: message.into(),
}
}
}
impl std::fmt::Display for RuntimePluginLoadError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.message)
}
}
impl std::error::Error for RuntimePluginLoadError {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RuntimePluginError {
pub message: String,
}
impl RuntimePluginError {
pub fn new(message: impl Into<String>) -> Self {
Self {
message: message.into(),
}
}
}
impl std::fmt::Display for RuntimePluginError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.message)
}
}
impl std::error::Error for RuntimePluginError {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RuntimePluginUnloadError {
pub message: String,
}
impl RuntimePluginUnloadError {
pub fn new(message: impl Into<String>) -> Self {
Self {
message: message.into(),
}
}
}
impl std::fmt::Display for RuntimePluginUnloadError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.message)
}
}
impl std::error::Error for RuntimePluginUnloadError {}
pub trait RuntimeHostContext {
fn with_read_typed_raw(
&self,
record_kind: RecordKind,
sys_id: SysId,
f: &mut dyn FnMut(&[u8]),
) -> Result<bool, RuntimeHostError>;
fn with_read_typed_by_pk_raw(
&self,
record_kind: RecordKind,
pk: &[u8],
f: &mut dyn FnMut(&[u8]),
) -> Result<bool, RuntimeHostError>;
fn create_typed_raw(
&mut self,
record_kind: RecordKind,
init: &mut dyn FnMut(&mut [u8]),
) -> Result<RecordKey, RuntimeHostError>;
fn update_typed_by_pk_raw(
&mut self,
record_kind: RecordKind,
pk: &[u8],
f: &mut dyn FnMut(&mut [u8]),
) -> Result<bool, RuntimeHostError>;
fn delete_by_pk_raw(
&mut self,
record_kind: RecordKind,
pk: &[u8],
) -> Result<bool, RuntimeHostError>;
fn emit_typed_event_raw(
&mut self,
event_kind: u8,
payload: &[u8],
) -> Result<(), RuntimeHostError>;
fn for_each_record_key_raw(
&self,
kind: RecordKind,
f: &mut dyn FnMut(RecordKey),
) -> Result<(), RuntimeHostError>;
fn debug_log(&mut self, _message: String) -> Result<(), RuntimeHostError> {
Ok(())
}
}
pub trait RuntimeHostContextExt: RuntimeHostContext {
fn with_read_typed<R, T, F>(&self, sys_id: SysId, f: F) -> Result<Option<T>, RuntimeHostError>
where
R: GeneratedRecordAccess,
F: FnOnce(R::Access<'_>) -> T,
{
let mut f = Some(f);
let mut out = None;
let found = self.with_read_typed_raw(R::KIND, sys_id, &mut |data| {
let apply = f.take().expect("callback invoked more than once");
out = Some(apply(R::wrap(data)));
})?;
Ok(found.then_some(out).flatten())
}
fn with_read_typed_by_pk<R, P, T, F>(&self, pk: P, f: F) -> Result<Option<T>, RuntimeHostError>
where
R: GeneratedRecordAccess,
P: AsRef<[u8]>,
F: FnOnce(R::Access<'_>) -> T,
{
on_runtime_host_with_read_typed_by_pk();
let mut f = Some(f);
let mut out = None;
let found = self.with_read_typed_by_pk_raw(R::KIND, pk.as_ref(), &mut |data| {
let apply = f.take().expect("callback invoked more than once");
out = Some(apply(R::wrap(data)));
})?;
Ok(found.then_some(out).flatten())
}
fn create_typed<R, F>(&mut self, init: F) -> Result<RecordKey, RuntimeHostError>
where
R: GeneratedRecordAccess,
F: for<'b> FnOnce(&mut R::NewBuilder<'b>),
{
let mut init = Some(init);
let key = self.create_typed_raw(R::KIND, &mut |buf| {
let apply = init.take().expect("init callback invoked more than once");
let mut builder = R::wrap_new(buf);
apply(&mut builder);
})?;
Ok(key)
}
fn update_typed_by_pk<R, P, T, F>(&mut self, pk: P, f: F) -> Result<Option<T>, RuntimeHostError>
where
R: GeneratedRecordAccess,
P: AsRef<[u8]>,
F: for<'b> FnOnce(&mut R::UpdateBuilder<'b>) -> T,
{
on_runtime_host_update_typed_by_pk();
let mut f = Some(f);
let mut out = None;
let found = self.update_typed_by_pk_raw(R::KIND, pk.as_ref(), &mut |buf| {
let apply = f.take().expect("update callback invoked more than once");
let mut builder = R::wrap_update(buf);
out = Some(apply(&mut builder));
})?;
Ok(found.then_some(out).flatten())
}
fn update_or_create_typed_by_pk<R, P, T, FU, FC>(
&mut self,
pk: P,
update: FU,
create: FC,
) -> Result<T, RuntimeHostError>
where
R: GeneratedRecordAccess,
P: AsRef<[u8]>,
FU: for<'b> FnOnce(&mut R::UpdateBuilder<'b>) -> T,
FC: for<'b> FnOnce(&mut R::NewBuilder<'b>) -> T,
{
if let Some(value) = self.update_typed_by_pk::<R, P, T, FU>(pk, update)? {
return Ok(value);
}
let mut out = None;
self.create_typed::<R, _>(|builder| {
out = Some(create(builder));
})?;
Ok(out.expect("create closure must produce a value"))
}
fn delete_by_pk<R, P>(&mut self, pk: P) -> Result<bool, RuntimeHostError>
where
R: GeneratedRecordAccess,
P: AsRef<[u8]>,
{
self.delete_by_pk_raw(R::KIND, pk.as_ref())
}
fn emit_typed_event<E>(&mut self, payload: Vec<u8>) -> Result<(), RuntimeHostError>
where
E: GeneratedEventAccess,
{
self.emit_typed_event_raw(E::KIND, &payload)
}
fn for_each_record_key(
&self,
kind: RecordKind,
f: &mut dyn FnMut(RecordKey),
) -> Result<(), RuntimeHostError> {
self.for_each_record_key_raw(kind, &mut |key| f(key))
}
}
impl<T: RuntimeHostContext + ?Sized> RuntimeHostContextExt for T {}
pub trait RuntimePluginFactory {
fn plugin_name(&self) -> &'static str;
fn schema_registry(&self) -> SchemaRegistry;
fn command_definitions(&self) -> &'static [&'static CommandDefinition] {
&[]
}
fn create(
&self,
plugin_config_text: &str,
) -> Result<Box<dyn RuntimePlugin>, RuntimePluginLoadError>;
}
pub trait BizInvariantReadContext {
fn with_read_typed_raw(
&self,
record_kind: RecordKind,
sys_id: SysId,
f: &mut dyn FnMut(&[u8]),
) -> Result<bool, RuntimeHostError>;
fn with_read_typed_by_pk_raw(
&self,
record_kind: RecordKind,
pk: &[u8],
f: &mut dyn FnMut(&[u8]),
) -> Result<bool, RuntimeHostError>;
fn for_each_record_key_raw(
&self,
kind: RecordKind,
f: &mut dyn FnMut(RecordKey),
) -> Result<(), RuntimeHostError>;
}
pub trait InvariantReadContextExt: BizInvariantReadContext {
fn with_read_typed<R, T, F>(&self, sys_id: SysId, f: F) -> Result<Option<T>, RuntimeHostError>
where
R: GeneratedRecordAccess,
F: FnOnce(R::Access<'_>) -> T,
{
let mut result = None;
let mut f = Some(f);
let found = self.with_read_typed_raw(R::KIND, sys_id, &mut |data| {
if let Some(f) = f.take() {
result = Some(f(R::wrap(data)));
}
})?;
if found { Ok(result) } else { Ok(None) }
}
fn with_read_typed_by_pk<R, P, T, F>(&self, pk: P, f: F) -> Result<Option<T>, RuntimeHostError>
where
R: GeneratedRecordAccess,
P: AsRef<[u8]>,
F: FnOnce(R::Access<'_>) -> T,
{
let mut result = None;
let mut f = Some(f);
let found = self.with_read_typed_by_pk_raw(R::KIND, pk.as_ref(), &mut |data| {
if let Some(f) = f.take() {
result = Some(f(R::wrap(data)));
}
})?;
if found { Ok(result) } else { Ok(None) }
}
}
impl<T: BizInvariantReadContext + ?Sized> InvariantReadContextExt for T {}
pub trait RuntimePlugin {
fn name(&self) -> &'static str;
fn schema_registry(&self) -> SchemaRegistry;
fn command_definitions(&self) -> &'static [&'static CommandDefinition] {
&[]
}
fn run_tx(
&self,
tx: &mut dyn RuntimeHostContext,
command: &dyn RuntimeCommandEnvelope,
) -> Result<(), RuntimePluginError>;
fn validate_biz_invariants(&self, _ctx: &dyn BizInvariantReadContext) -> Result<(), String> {
Ok(())
}
fn on_unload(&mut self) -> Result<(), RuntimePluginUnloadError> {
Ok(())
}
}
pub trait RuntimeCommandEnvelope {
fn command_kind(&self) -> u8;
fn ext_seq(&self) -> u64;
fn ref_ext_time_us(&self) -> u64;
fn payload(&self) -> &[u8];
}
#[derive(Debug, Clone, Copy)]
pub struct RuntimeCommandRef<'a> {
command_kind: u8,
ext_seq: u64,
ref_ext_time_us: u64,
payload: &'a [u8],
}
impl<'a> RuntimeCommandRef<'a> {
#[inline]
pub fn new(command_kind: u8, ext_seq: u64, ref_ext_time_us: u64, payload: &'a [u8]) -> Self {
Self {
command_kind,
ext_seq,
ref_ext_time_us,
payload,
}
}
}
impl RuntimeCommandEnvelope for RuntimeCommandRef<'_> {
#[inline(always)]
fn command_kind(&self) -> u8 {
self.command_kind
}
#[inline(always)]
fn ext_seq(&self) -> u64 {
self.ext_seq
}
#[inline(always)]
fn ref_ext_time_us(&self) -> u64 {
self.ref_ext_time_us
}
#[inline(always)]
fn payload(&self) -> &[u8] {
self.payload
}
}
impl RuntimeCommandEnvelope for Command {
#[inline(always)]
fn command_kind(&self) -> u8 {
self.command_kind()
}
#[inline(always)]
fn ext_seq(&self) -> u64 {
self.ext_seq()
}
#[inline(always)]
fn ref_ext_time_us(&self) -> u64 {
self.ref_ext_time_us()
}
#[inline(always)]
fn payload(&self) -> &[u8] {
self.payload()
}
}
pub trait TxReadContext {
type Error;
fn with_read_raw<T>(
&self,
key: RecordKey,
f: impl FnOnce(&[u8]) -> T,
) -> Result<Option<T>, Self::Error>;
fn for_each_record_key(&self, kind: RecordKind, f: &mut dyn FnMut(RecordKey));
}
pub trait TxPkContext: TxReadContext {
fn resolve_pk(&self, kind: RecordKind, pk: &[u8]) -> Result<Option<SysId>, Self::Error>;
}
pub trait TxWriteContext: TxReadContext {
fn create_raw(&mut self, kind: RecordKind, data: Vec<u8>) -> Result<RecordKey, Self::Error>;
fn update_raw<T>(
&mut self,
key: RecordKey,
f: impl FnOnce(&mut [u8]) -> T,
) -> Result<Option<T>, Self::Error>;
fn delete_raw(&mut self, key: RecordKey) -> Result<bool, Self::Error>;
fn emit_event_raw(&mut self, event_kind: u8, payload: Vec<u8>);
fn debug_log(&mut self, _message: String) {}
}
pub trait TxSysIdCreateContext: TxWriteContext {
fn create_with_sys_id_raw(
&mut self,
kind: RecordKind,
sys_id: SysId,
data: Vec<u8>,
) -> Result<RecordKey, Self::Error>;
}
pub trait TxContext: TxPkContext + TxSysIdCreateContext {}
impl<T: TxPkContext + TxSysIdCreateContext + ?Sized> TxContext for T {}
pub trait TypedTxContext {
type Error;
fn with_read_typed<R, T, F>(&self, sys_id: SysId, f: F) -> Result<Option<T>, Self::Error>
where
R: GeneratedRecordAccess,
F: FnOnce(R::Access<'_>) -> T;
fn with_read_typed_by_pk<R, P, T, F>(&self, pk: P, f: F) -> Result<Option<T>, Self::Error>
where
R: GeneratedRecordAccess,
P: AsRef<[u8]>,
F: FnOnce(R::Access<'_>) -> T;
fn create_typed<R, F>(&mut self, init: F) -> Result<RecordKey, Self::Error>
where
R: GeneratedRecordAccess,
F: for<'b> FnOnce(&mut R::NewBuilder<'b>);
fn update_typed_by_pk<R, P, T, F>(&mut self, pk: P, f: F) -> Result<Option<T>, Self::Error>
where
R: GeneratedRecordAccess,
P: AsRef<[u8]>,
F: for<'b> FnOnce(&mut R::UpdateBuilder<'b>) -> T;
fn update_or_create_typed_by_pk<R, P, T, FU, FC>(
&mut self,
pk: P,
update: FU,
create: FC,
) -> Result<T, Self::Error>
where
R: GeneratedRecordAccess,
P: AsRef<[u8]>,
FU: for<'b> FnOnce(&mut R::UpdateBuilder<'b>) -> T,
FC: for<'b> FnOnce(&mut R::NewBuilder<'b>) -> T,
{
if let Some(value) = self.update_typed_by_pk::<R, P, T, FU>(pk, update)? {
return Ok(value);
}
let mut out = None;
self.create_typed::<R, _>(|builder| {
out = Some(create(builder));
})?;
Ok(out.expect("create closure must produce a value"))
}
fn delete_by_pk<R, P>(&mut self, pk: P) -> Result<bool, Self::Error>
where
R: GeneratedRecordAccess,
P: AsRef<[u8]>;
fn emit_typed_event<E>(&mut self, payload: Vec<u8>)
where
E: GeneratedEventAccess;
fn for_each_record_key(&self, kind: RecordKind, f: &mut dyn FnMut(RecordKey));
fn debug_log(&mut self, _message: String) {}
}
impl<Ctx: TxContext + ?Sized> TypedTxContext for Ctx {
type Error = <Ctx as TxReadContext>::Error;
fn with_read_typed<R, T, F>(&self, sys_id: SysId, f: F) -> Result<Option<T>, Self::Error>
where
R: GeneratedRecordAccess,
F: FnOnce(R::Access<'_>) -> T,
{
TxReadContext::with_read_raw(
self,
RecordKey {
kind: R::KIND,
sys_id,
},
|data| f(R::wrap(data)),
)
}
fn with_read_typed_by_pk<R, P, T, F>(&self, pk: P, f: F) -> Result<Option<T>, Self::Error>
where
R: GeneratedRecordAccess,
P: AsRef<[u8]>,
F: FnOnce(R::Access<'_>) -> T,
{
on_typed_tx_with_read_typed_by_pk();
let Some(sys_id) = TxPkContext::resolve_pk(self, R::KIND, pk.as_ref())? else {
return Ok(None);
};
TypedTxContext::with_read_typed::<R, T, F>(self, sys_id, f)
}
fn create_typed<R, F>(&mut self, init: F) -> Result<RecordKey, Self::Error>
where
R: GeneratedRecordAccess,
F: for<'b> FnOnce(&mut R::NewBuilder<'b>),
{
let mut data = vec![0u8; R::DATA_LEN];
init(&mut R::wrap_new(&mut data));
TxWriteContext::create_raw(self, R::KIND, data)
}
fn update_typed_by_pk<R, P, T, F>(&mut self, pk: P, f: F) -> Result<Option<T>, Self::Error>
where
R: GeneratedRecordAccess,
P: AsRef<[u8]>,
F: for<'b> FnOnce(&mut R::UpdateBuilder<'b>) -> T,
{
on_typed_tx_update_typed_by_pk();
let Some(sys_id) = TxPkContext::resolve_pk(self, R::KIND, pk.as_ref())? else {
return Ok(None);
};
TxWriteContext::update_raw(
self,
RecordKey {
kind: R::KIND,
sys_id,
},
|data| {
let mut builder = R::wrap_update(data);
f(&mut builder)
},
)
}
fn update_or_create_typed_by_pk<R, P, T, FU, FC>(
&mut self,
pk: P,
update: FU,
create: FC,
) -> Result<T, Self::Error>
where
R: GeneratedRecordAccess,
P: AsRef<[u8]>,
FU: for<'b> FnOnce(&mut R::UpdateBuilder<'b>) -> T,
FC: for<'b> FnOnce(&mut R::NewBuilder<'b>) -> T,
{
on_typed_tx_update_or_create_typed_by_pk();
if let Some(value) = self.update_typed_by_pk::<R, P, T, FU>(pk, update)? {
return Ok(value);
}
let mut out = None;
self.create_typed::<R, _>(|builder| {
out = Some(create(builder));
})?;
Ok(out.expect("create closure must produce a value"))
}
fn delete_by_pk<R, P>(&mut self, pk: P) -> Result<bool, Self::Error>
where
R: GeneratedRecordAccess,
P: AsRef<[u8]>,
{
let Some(sys_id) = TxPkContext::resolve_pk(self, R::KIND, pk.as_ref())? else {
return Ok(false);
};
TxWriteContext::delete_raw(
self,
RecordKey {
kind: R::KIND,
sys_id,
},
)
}
fn emit_typed_event<E>(&mut self, payload: Vec<u8>)
where
E: GeneratedEventAccess,
{
TxWriteContext::emit_event_raw(self, E::KIND, payload);
}
fn for_each_record_key(&self, kind: RecordKind, f: &mut dyn FnMut(RecordKey)) {
TxReadContext::for_each_record_key(self, kind, f);
}
fn debug_log(&mut self, message: String) {
TxWriteContext::debug_log(self, message);
}
}
impl TypedTxContext for dyn RuntimeHostContext + '_ {
type Error = RuntimeHostError;
fn with_read_typed<R, T, F>(&self, sys_id: SysId, f: F) -> Result<Option<T>, Self::Error>
where
R: GeneratedRecordAccess,
F: FnOnce(R::Access<'_>) -> T,
{
RuntimeHostContextExt::with_read_typed::<R, T, F>(self, sys_id, f)
}
fn with_read_typed_by_pk<R, P, T, F>(&self, pk: P, f: F) -> Result<Option<T>, Self::Error>
where
R: GeneratedRecordAccess,
P: AsRef<[u8]>,
F: FnOnce(R::Access<'_>) -> T,
{
RuntimeHostContextExt::with_read_typed_by_pk::<R, P, T, F>(self, pk, f)
}
fn create_typed<R, F>(&mut self, init: F) -> Result<RecordKey, Self::Error>
where
R: GeneratedRecordAccess,
F: for<'b> FnOnce(&mut R::NewBuilder<'b>),
{
RuntimeHostContextExt::create_typed::<R, F>(self, init)
}
fn update_typed_by_pk<R, P, T, F>(&mut self, pk: P, f: F) -> Result<Option<T>, Self::Error>
where
R: GeneratedRecordAccess,
P: AsRef<[u8]>,
F: for<'b> FnOnce(&mut R::UpdateBuilder<'b>) -> T,
{
RuntimeHostContextExt::update_typed_by_pk::<R, P, T, F>(self, pk, f)
}
fn delete_by_pk<R, P>(&mut self, pk: P) -> Result<bool, Self::Error>
where
R: GeneratedRecordAccess,
P: AsRef<[u8]>,
{
RuntimeHostContextExt::delete_by_pk::<R, P>(self, pk)
}
fn emit_typed_event<E>(&mut self, payload: Vec<u8>)
where
E: GeneratedEventAccess,
{
RuntimeHostContextExt::emit_typed_event::<E>(self, payload)
.expect("host emit_typed_event_raw failed");
}
fn for_each_record_key(&self, kind: RecordKind, f: &mut dyn FnMut(RecordKey)) {
let _ = RuntimeHostContextExt::for_each_record_key(self, kind, f);
}
fn debug_log(&mut self, message: String) {
let _ = RuntimeHostContext::debug_log(self, message);
}
}
pub trait QueueProducer {
type Error;
fn append(&mut self, record: &[u8]) -> Result<u64, Self::Error>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct QueueRecord<T> {
pub ext_seq: u64,
pub ref_ext_time_us: u64,
pub record: T,
}
pub trait QueueConsumer {
type Record;
type Error;
fn poll(&mut self) -> Result<Option<QueueRecord<Self::Record>>, Self::Error>;
fn commit_through(&mut self, ext_seq: u64) -> Result<(), Self::Error>;
}
pub trait QueueConsumerResume {
type Error;
fn resume_next_ext_seq(&mut self) -> Result<Option<u64>, Self::Error>;
}
pub trait CommittedResultQuery {
type Error;
fn query_committed_by_ext_seq(
&mut self,
ext_seq: u64,
) -> Result<Option<CommittedStatus>, Self::Error>;
}
impl<F, E> CommittedResultQuery for F
where
F: FnMut(u64) -> Result<Option<CommittedStatus>, E>,
{
type Error = E;
fn query_committed_by_ext_seq(
&mut self,
ext_seq: u64,
) -> Result<Option<CommittedStatus>, Self::Error> {
self(ext_seq)
}
}
pub const SUBMIT_REQUEST_RECORD_VERSION: u8 = 1;
const SUBMIT_REQUEST_HEADER_LEN: usize = 6;
const SUBMIT_REQUEST_PAYLOAD_LEN_OFFSET: usize = 2;
pub const COMMITTED_RESULT_RECORD_VERSION: u8 = 2;
pub const COMMITTED_RESULT_RECORD_LEN: usize = 18;
const COMMITTED_RESULT_STATUS_TAG_OFFSET: usize = 9;
const COMMITTED_STATUS_TAG_COMMITTED: u8 = 1;
const COMMITTED_STATUS_TAG_REJECTED: u8 = 2;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SubmitRequest {
pub command_kind: u8,
pub payload: Vec<u8>,
}
impl SubmitRequest {
pub fn new(command_kind: u8, payload: Vec<u8>) -> Self {
Self {
command_kind,
payload,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct QueueReceipt {
pub ext_seq: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CommittedReceipt {
pub ext_seq: u64,
pub status: CommittedStatus,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CommittedResultRecord {
pub ext_seq: u64,
pub status: CommittedStatus,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u16)]
pub enum RejectedErrorCode {
CommandRejected = 1,
RuntimePanic = 103,
}
impl RejectedErrorCode {
pub fn to_u16(self) -> u16 {
self as u16
}
pub fn from_u16(value: u16) -> Option<Self> {
match value {
1 => Some(Self::CommandRejected),
103 => Some(Self::RuntimePanic),
_ => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CommittedStatus {
Committed { tx_seq: u64 },
Rejected { error_code: RejectedErrorCode },
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum QueueCodecError {
Truncated,
TrailingBytes { expected: usize, actual: usize },
FieldTooLarge { field: &'static str, len: u64 },
UnsupportedVersion { expected: u8, found: u8 },
InvalidFieldValue { field: &'static str, value: u64 },
}
impl std::fmt::Display for QueueCodecError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Truncated => write!(f, "truncated queue record"),
Self::TrailingBytes { expected, actual } => {
write!(
f,
"queue record has trailing bytes: expected {expected}, actual {actual}"
)
}
Self::FieldTooLarge { field, len } => {
write!(f, "queue record field '{field}' too large: {len} bytes")
}
Self::UnsupportedVersion { expected, found } => {
write!(
f,
"unsupported queue record version: expected {expected}, found {found}"
)
}
Self::InvalidFieldValue { field, value } => {
write!(f, "invalid value for queue record field '{field}': {value}")
}
}
}
}
impl std::error::Error for QueueCodecError {}
pub fn encode_submit_request(request: &SubmitRequest) -> Result<Vec<u8>, QueueCodecError> {
let payload_len =
u32::try_from(request.payload.len()).map_err(|_| QueueCodecError::FieldTooLarge {
field: "payload",
len: request.payload.len() as u64,
})?;
let mut out = Vec::with_capacity(SUBMIT_REQUEST_HEADER_LEN + request.payload.len());
out.push(SUBMIT_REQUEST_RECORD_VERSION);
out.push(request.command_kind);
out.extend_from_slice(&payload_len.to_le_bytes());
out.extend_from_slice(&request.payload);
Ok(out)
}
pub fn decode_submit_request(bytes: &[u8]) -> Result<SubmitRequest, QueueCodecError> {
if bytes.len() < SUBMIT_REQUEST_HEADER_LEN {
return Err(QueueCodecError::Truncated);
}
let version = bytes[0];
if version != SUBMIT_REQUEST_RECORD_VERSION {
return Err(QueueCodecError::UnsupportedVersion {
expected: SUBMIT_REQUEST_RECORD_VERSION,
found: version,
});
}
let command_kind = bytes[1];
let payload_len = u32::from_le_bytes(
bytes[SUBMIT_REQUEST_PAYLOAD_LEN_OFFSET..SUBMIT_REQUEST_PAYLOAD_LEN_OFFSET + 4]
.try_into()
.map_err(|_| QueueCodecError::Truncated)?,
) as usize;
let payload_off = SUBMIT_REQUEST_HEADER_LEN;
let expected = payload_off + payload_len;
if bytes.len() < expected {
return Err(QueueCodecError::Truncated);
}
if bytes.len() != expected {
return Err(QueueCodecError::TrailingBytes {
expected,
actual: bytes.len(),
});
}
Ok(SubmitRequest {
command_kind,
payload: bytes[payload_off..expected].to_vec(),
})
}
pub fn encode_committed_result_record_fixed(
record: CommittedResultRecord,
) -> [u8; COMMITTED_RESULT_RECORD_LEN] {
let mut out = [0u8; COMMITTED_RESULT_RECORD_LEN];
out[0] = COMMITTED_RESULT_RECORD_VERSION;
out[1..9].copy_from_slice(&record.ext_seq.to_le_bytes());
match record.status {
CommittedStatus::Committed { tx_seq } => {
out[COMMITTED_RESULT_STATUS_TAG_OFFSET] = COMMITTED_STATUS_TAG_COMMITTED;
out[10..18].copy_from_slice(&tx_seq.to_le_bytes());
}
CommittedStatus::Rejected { error_code } => {
out[COMMITTED_RESULT_STATUS_TAG_OFFSET] = COMMITTED_STATUS_TAG_REJECTED;
out[10..18].copy_from_slice(&(error_code.to_u16() as u64).to_le_bytes());
}
}
out
}
pub fn encode_committed_result_record(record: CommittedResultRecord) -> Vec<u8> {
encode_committed_result_record_fixed(record).to_vec()
}
pub fn decode_committed_result_record(
bytes: &[u8],
) -> Result<CommittedResultRecord, QueueCodecError> {
if bytes.len() < COMMITTED_RESULT_RECORD_LEN {
return Err(QueueCodecError::Truncated);
}
let version = bytes[0];
if version != COMMITTED_RESULT_RECORD_VERSION {
return Err(QueueCodecError::UnsupportedVersion {
expected: COMMITTED_RESULT_RECORD_VERSION,
found: version,
});
}
if bytes.len() != COMMITTED_RESULT_RECORD_LEN {
return Err(QueueCodecError::TrailingBytes {
expected: COMMITTED_RESULT_RECORD_LEN,
actual: bytes.len(),
});
}
let status_tag = bytes[COMMITTED_RESULT_STATUS_TAG_OFFSET];
let status_value = u64::from_le_bytes(
bytes[10..18]
.try_into()
.map_err(|_| QueueCodecError::Truncated)?,
);
let status = match status_tag {
COMMITTED_STATUS_TAG_COMMITTED => CommittedStatus::Committed {
tx_seq: status_value,
},
COMMITTED_STATUS_TAG_REJECTED => {
let code_u16 =
u16::try_from(status_value).map_err(|_| QueueCodecError::InvalidFieldValue {
field: "error_code",
value: status_value,
})?;
let error_code = RejectedErrorCode::from_u16(code_u16).ok_or(
QueueCodecError::InvalidFieldValue {
field: "error_code",
value: status_value,
},
)?;
CommittedStatus::Rejected { error_code }
}
other => {
return Err(QueueCodecError::InvalidFieldValue {
field: "status_tag",
value: other as u64,
});
}
};
Ok(CommittedResultRecord {
ext_seq: u64::from_le_bytes(
bytes[1..9]
.try_into()
.map_err(|_| QueueCodecError::Truncated)?,
),
status,
})
}