use alloc::{collections::BTreeSet, vec::Vec};
use jacquard_core::{
MaterializedRoute, NodeId, RouteCommitment, RouteError, RouteEvent, RouteEventStamped, RouteId,
RouteRuntimeError,
};
use jacquard_traits::{OrderEffects, RouteEventLogEffects, StorageEffects, TimeEffects};
trait StorageResultExt<T> {
fn storage_invalid(self) -> Result<T, RouteError>;
}
impl<T, E> StorageResultExt<T> for Result<T, E> {
fn storage_invalid(self) -> Result<T, RouteError> {
match self {
Ok(value) => Ok(value),
Err(_) => Err(RouteError::Runtime(RouteRuntimeError::Invalidated)),
}
}
}
use serde::{Deserialize, Serialize};
const ROUTER_CHECKPOINT_BYTES_MAX: usize = 1_048_576;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct RouterCheckpointRecord {
pub(crate) route: MaterializedRoute,
pub(crate) commitments: Vec<RouteCommitment>,
}
pub(crate) trait RouterRuntimeEffects:
TimeEffects + OrderEffects + StorageEffects + RouteEventLogEffects
{
}
impl<T> RouterRuntimeEffects for T where
T: TimeEffects + OrderEffects + StorageEffects + RouteEventLogEffects
{
}
pub(crate) struct RouterRuntimeAdapter<'a, Effects> {
local_node_id: NodeId,
effects: &'a mut Effects,
}
impl<'a, Effects> RouterRuntimeAdapter<'a, Effects>
where
Effects: RouterRuntimeEffects,
{
pub(crate) fn new(local_node_id: NodeId, effects: &'a mut Effects) -> Self {
Self {
local_node_id,
effects,
}
}
pub(crate) fn persist_route(
&mut self,
record: &RouterCheckpointRecord,
) -> Result<(), RouteError> {
let route_key =
route_storage_key(&self.local_node_id, &record.route.identity.stamp.route_id);
let route_bytes = encode_checkpoint_value(record)?;
self.effects
.store_bytes(&route_key, &route_bytes)
.storage_invalid()?;
let mut registry = self.load_route_registry()?;
registry.insert(record.route.identity.stamp.route_id);
if let Err(error) = self.store_route_registry(®istry) {
let _rollback_remove_failed = self.effects.remove_bytes(&route_key).is_err();
return Err(error);
}
Ok(())
}
pub(crate) fn restore_route_record(
&mut self,
record: &RouterCheckpointRecord,
) -> Result<(), RouteError> {
self.persist_route(record)
}
pub(crate) fn remove_route(&mut self, route_id: &RouteId) -> Result<(), RouteError> {
let route_key = route_storage_key(&self.local_node_id, route_id);
self.effects.remove_bytes(&route_key).storage_invalid()?;
let mut registry = self.load_route_registry()?;
registry.remove(route_id);
self.store_route_registry(®istry)
}
pub(crate) fn load_routes(
&mut self,
) -> Result<Vec<(RouteId, RouterCheckpointRecord)>, RouteError> {
let registry = self.load_route_registry()?;
let mut recovered = Vec::new();
let mut pruned_registry = registry.clone();
for route_id in registry {
let route_key = route_storage_key(&self.local_node_id, &route_id);
let Some(bytes) = self.effects.load_bytes(&route_key).storage_invalid()? else {
pruned_registry.remove(&route_id);
continue;
};
if bytes.len() > ROUTER_CHECKPOINT_BYTES_MAX {
pruned_registry.remove(&route_id);
let _remove_failed = self.effects.remove_bytes(&route_key).is_err();
continue;
}
match decode_checkpoint_value::<RouterCheckpointRecord>(&bytes) {
Ok(record) => recovered.push((route_id, record)),
Err(_) => {
pruned_registry.remove(&route_id);
let _remove_failed = self.effects.remove_bytes(&route_key).is_err();
}
}
}
if pruned_registry != self.load_route_registry()? {
self.store_route_registry(&pruned_registry)?;
}
Ok(recovered)
}
pub(crate) fn record_route_event(&mut self, event: RouteEvent) -> Result<(), RouteError> {
let order_stamp = self.effects.next_order_stamp();
let emitted_at_tick = self.effects.now_tick();
self.effects
.record_route_event(RouteEventStamped {
order_stamp,
emitted_at_tick,
event,
})
.storage_invalid()
}
fn load_route_registry(&mut self) -> Result<BTreeSet<RouteId>, RouteError> {
let registry_key = route_registry_storage_key(&self.local_node_id);
let Some(bytes) = self.effects.load_bytes(®istry_key).storage_invalid()? else {
return Ok(BTreeSet::new());
};
decode_checkpoint_value(&bytes)
}
fn store_route_registry(&mut self, registry: &BTreeSet<RouteId>) -> Result<(), RouteError> {
let registry_key = route_registry_storage_key(&self.local_node_id);
if registry.is_empty() {
return self.effects.remove_bytes(®istry_key).storage_invalid();
}
let registry_bytes = encode_checkpoint_value(registry)?;
self.effects
.store_bytes(®istry_key, ®istry_bytes)
.storage_invalid()
}
}
fn route_registry_storage_key(local_node_id: &NodeId) -> Vec<u8> {
let mut key = b"router/".to_vec();
key.extend_from_slice(&local_node_id.0);
key.extend_from_slice(b"/routes");
key
}
fn route_storage_key(local_node_id: &NodeId, route_id: &RouteId) -> Vec<u8> {
let mut key = b"router/".to_vec();
key.extend_from_slice(&local_node_id.0);
key.extend_from_slice(b"/route/");
key.extend_from_slice(&route_id.0);
key
}
fn encode_checkpoint_value<T>(value: &T) -> Result<Vec<u8>, RouteError>
where
T: Serialize,
{
postcard::to_allocvec(value).storage_invalid()
}
fn decode_checkpoint_value<T>(bytes: &[u8]) -> Result<T, RouteError>
where
T: for<'de> Deserialize<'de>,
{
if bytes.len() > ROUTER_CHECKPOINT_BYTES_MAX {
return Err(RouteError::Runtime(RouteRuntimeError::Invalidated));
}
postcard::from_bytes(bytes).storage_invalid()
}