use core::marker::PhantomData;
use std::collections::HashMap;
use arkhe_forge_core::activity::{ActivityId, ActivityRecord, EntityShellId};
use arkhe_forge_core::actor::{ActorId, ActorProfile, UserBinding};
use arkhe_forge_core::brand::ShellId;
use arkhe_forge_core::context::EventRecord;
use arkhe_forge_core::entry::{EntryBody, EntryCore, EntryId, EntryParentDepth};
use arkhe_forge_core::event::{ArkheEvent, CrossShellActivity};
use arkhe_forge_core::space::{ParentChainDepth, SpaceConfig, SpaceId, SpaceMembership};
use arkhe_kernel::abi::{InstanceId, Tick, TypeCode};
use serde::{Deserialize, Serialize};
use crate::manifest::ManifestSnapshot;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum ObserverState {
Passive,
Active,
Draining,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum PromotionDecision {
Promote,
Wait,
}
pub const HF2_HEALTH_QUORUM_MIN: usize = 2;
pub struct ProjectionContext<'i> {
pub tick: Tick,
pub instance_id: InstanceId,
pub manifest: Option<&'i ManifestSnapshot>,
_phantom: PhantomData<&'i ()>,
}
impl<'i> ProjectionContext<'i> {
#[inline]
#[must_use]
pub fn new(tick: Tick, instance_id: InstanceId) -> Self {
Self {
tick,
instance_id,
manifest: None,
_phantom: PhantomData,
}
}
#[inline]
#[must_use]
pub fn with_manifest(
tick: Tick,
instance_id: InstanceId,
manifest: &'i ManifestSnapshot,
) -> Self {
Self {
tick,
instance_id,
manifest: Some(manifest),
_phantom: PhantomData,
}
}
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum ProjectionError {
#[error("projection sequence backward: last {last}, incoming {incoming}")]
SequenceBackward {
last: u64,
incoming: u64,
},
#[error("projection sequence gap: last {last}, incoming {incoming}")]
SequenceGap {
last: u64,
incoming: u64,
},
#[error("observer not active: current state {state:?}")]
NotActive {
state: ObserverState,
},
#[error("projection storage error: {0}")]
Storage(&'static str),
#[error("event decode failed: {0}")]
DecodeFailed(&'static str),
#[error("projection row missing")]
MissingRow,
}
pub trait Projection: Send + Sync {
fn observes(&self) -> &[TypeCode];
fn on_event(
&mut self,
event: &EventRecord,
ctx: &ProjectionContext<'_>,
) -> Result<(), ProjectionError>;
fn on_state_change(&mut self, _new_state: ObserverState) -> Result<(), ProjectionError> {
Ok(())
}
fn last_applied(&self) -> Option<(u64, Tick)>;
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
pub struct ProjectionCursor {
pub sequence: u64,
pub tick: Tick,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct ActorProjection {
pub schema_version: u16,
pub actor_id: ActorId,
pub profile: ActorProfile,
pub user_binding: Option<UserBinding>,
pub cursor: Option<ProjectionCursor>,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct SpaceProjection {
pub schema_version: u16,
pub space_id: SpaceId,
pub config: SpaceConfig,
pub parent_chain_depth: Option<ParentChainDepth>,
pub membership: Option<SpaceMembership>,
pub cursor: Option<ProjectionCursor>,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct EntryProjection {
pub schema_version: u16,
pub entry_id: EntryId,
pub core: EntryCore,
pub body: Option<EntryBody>,
pub parent_depth: Option<EntryParentDepth>,
pub cursor: Option<ProjectionCursor>,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct ActivityProjection {
pub schema_version: u16,
pub activity_id: ActivityId,
pub record: ActivityRecord,
pub entity_shell_id: Option<EntityShellId>,
pub cursor: Option<ProjectionCursor>,
}
pub trait ProjectionStore: Send + Sync {
fn upsert_actor(&mut self, row: &ActorProjection) -> Result<(), ProjectionError>;
fn upsert_space(&mut self, row: &SpaceProjection) -> Result<(), ProjectionError>;
fn upsert_entry(&mut self, row: &EntryProjection) -> Result<(), ProjectionError>;
fn upsert_activity(&mut self, row: &ActivityProjection) -> Result<(), ProjectionError>;
fn get_actor(&self, actor_id: ActorId) -> Option<ActorProjection>;
fn get_space(&self, space_id: SpaceId) -> Option<SpaceProjection>;
fn get_entry(&self, entry_id: EntryId) -> Option<EntryProjection>;
fn get_activity(&self, activity_id: ActivityId) -> Option<ActivityProjection>;
}
#[derive(Debug, Default)]
pub struct InMemoryProjectionStore {
actors: HashMap<ActorId, ActorProjection>,
spaces: HashMap<SpaceId, SpaceProjection>,
entries: HashMap<EntryId, EntryProjection>,
activities: HashMap<ActivityId, ActivityProjection>,
}
impl InMemoryProjectionStore {
#[inline]
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
impl ProjectionStore for InMemoryProjectionStore {
fn upsert_actor(&mut self, row: &ActorProjection) -> Result<(), ProjectionError> {
self.actors.insert(row.actor_id, row.clone());
Ok(())
}
fn upsert_space(&mut self, row: &SpaceProjection) -> Result<(), ProjectionError> {
self.spaces.insert(row.space_id, row.clone());
Ok(())
}
fn upsert_entry(&mut self, row: &EntryProjection) -> Result<(), ProjectionError> {
self.entries.insert(row.entry_id, row.clone());
Ok(())
}
fn upsert_activity(&mut self, row: &ActivityProjection) -> Result<(), ProjectionError> {
self.activities.insert(row.activity_id, row.clone());
Ok(())
}
fn get_actor(&self, actor_id: ActorId) -> Option<ActorProjection> {
self.actors.get(&actor_id).cloned()
}
fn get_space(&self, space_id: SpaceId) -> Option<SpaceProjection> {
self.spaces.get(&space_id).cloned()
}
fn get_entry(&self, entry_id: EntryId) -> Option<EntryProjection> {
self.entries.get(&entry_id).cloned()
}
fn get_activity(&self, activity_id: ActivityId) -> Option<ActivityProjection> {
self.activities.get(&activity_id).cloned()
}
}
pub struct ProjectionRouter {
projections: Vec<Box<dyn Projection>>,
state: ObserverState,
}
impl ProjectionRouter {
#[inline]
#[must_use]
pub fn new() -> Self {
Self {
projections: Vec::new(),
state: ObserverState::Passive,
}
}
pub fn register(&mut self, projection: Box<dyn Projection>) {
self.projections.push(projection);
}
#[inline]
#[must_use]
pub fn state(&self) -> ObserverState {
self.state
}
pub fn promote_to_active(&mut self) -> Result<(), ProjectionError> {
if self.state == ObserverState::Draining {
return Err(ProjectionError::NotActive { state: self.state });
}
self.transition(ObserverState::Active)
}
#[must_use]
pub fn evaluate_auto_promote(
&self,
manifest: &crate::manifest::ManifestSnapshot,
primary_down_duration: core::time::Duration,
health: &crate::hf2_kms::health::MultiChannelHealth,
threshold_ready: bool,
) -> Option<PromotionDecision> {
match manifest.audit.kms_auto_promote.as_str() {
"manual" => Some(PromotionDecision::Wait),
"after_60min" => {
let elapsed_ok = primary_down_duration >= core::time::Duration::from_secs(60 * 60);
let health_ok = health.healthy_count() >= HF2_HEALTH_QUORUM_MIN;
if elapsed_ok && health_ok {
Some(PromotionDecision::Promote)
} else {
Some(PromotionDecision::Wait)
}
}
"threshold_hsm" => {
if threshold_ready {
Some(PromotionDecision::Promote)
} else {
Some(PromotionDecision::Wait)
}
}
_ => None,
}
}
pub fn demote_to_passive(&mut self) -> Result<(), ProjectionError> {
self.transition(ObserverState::Passive)
}
pub fn begin_draining(&mut self) -> Result<(), ProjectionError> {
self.transition(ObserverState::Draining)
}
fn transition(&mut self, next: ObserverState) -> Result<(), ProjectionError> {
for p in &mut self.projections {
p.on_state_change(next)?;
}
self.state = next;
Ok(())
}
pub fn dispatch(
&mut self,
event: &EventRecord,
ctx: &ProjectionContext<'_>,
) -> Result<usize, ProjectionError> {
if self.state != ObserverState::Active {
return Err(ProjectionError::NotActive { state: self.state });
}
let tc = TypeCode(event.type_code);
let mut applied = 0usize;
for p in &mut self.projections {
if !p.observes().contains(&tc) {
continue;
}
if let Some((last_seq, _)) = p.last_applied() {
if event.sequence == last_seq {
continue; }
if event.sequence < last_seq {
return Err(ProjectionError::SequenceBackward {
last: last_seq,
incoming: event.sequence,
});
}
if event.sequence > last_seq.saturating_add(1) {
return Err(ProjectionError::SequenceGap {
last: last_seq,
incoming: event.sequence,
});
}
}
p.on_event(event, ctx)?;
applied += 1;
}
Ok(applied)
}
}
impl Default for ProjectionRouter {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct CrossShellActivityFanout {
observes: [TypeCode; 1],
by_target_shell: HashMap<ShellId, Vec<CrossShellActivity>>,
cursor: Option<ProjectionCursor>,
}
impl Default for CrossShellActivityFanout {
fn default() -> Self {
Self::new()
}
}
impl CrossShellActivityFanout {
#[inline]
#[must_use]
pub fn new() -> Self {
Self {
observes: [TypeCode(CrossShellActivity::TYPE_CODE)],
by_target_shell: HashMap::new(),
cursor: None,
}
}
#[inline]
#[must_use]
pub fn notifications_for(&self, shell: &ShellId) -> &[CrossShellActivity] {
self.by_target_shell
.get(shell)
.map(Vec::as_slice)
.unwrap_or(&[])
}
}
impl Projection for CrossShellActivityFanout {
fn observes(&self) -> &[TypeCode] {
&self.observes
}
fn on_event(
&mut self,
event: &EventRecord,
_ctx: &ProjectionContext<'_>,
) -> Result<(), ProjectionError> {
let notice: CrossShellActivity = postcard::from_bytes(&event.payload)
.map_err(|_| ProjectionError::DecodeFailed("CrossShellActivity payload"))?;
self.by_target_shell
.entry(notice.target_shell_id)
.or_default()
.push(notice);
self.cursor = Some(ProjectionCursor {
sequence: event.sequence,
tick: event.tick,
});
Ok(())
}
fn last_applied(&self) -> Option<(u64, Tick)> {
self.cursor.map(|c| (c.sequence, c.tick))
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use arkhe_forge_core::actor::ActorKind;
use arkhe_forge_core::component::BoundedString;
use arkhe_kernel::abi::EntityId;
use bytes::Bytes;
fn sid(byte: u8) -> ShellId {
ShellId([byte; 16])
}
fn ent(v: u64) -> EntityId {
EntityId::new(v).unwrap()
}
fn make_cross_shell_event(seq: u64, tick: u64, target: ShellId) -> EventRecord {
let notice = CrossShellActivity {
schema_version: 1,
actor: ActorId::new(ent(1)),
target_shell_id: target,
record_shell_id: sid(0xAA),
detected_tick: Tick(tick),
};
EventRecord {
type_code: CrossShellActivity::TYPE_CODE,
sequence: seq,
tick: Tick(tick),
payload: Bytes::from(postcard::to_stdvec(¬ice).unwrap()),
}
}
fn ctx(tick: u64) -> ProjectionContext<'static> {
ProjectionContext::new(Tick(tick), InstanceId::new(1).unwrap())
}
#[test]
fn router_defaults_to_passive() {
let r = ProjectionRouter::new();
assert_eq!(r.state(), ObserverState::Passive);
}
#[test]
fn router_promote_then_demote_then_drain() {
let mut r = ProjectionRouter::new();
r.promote_to_active().unwrap();
assert_eq!(r.state(), ObserverState::Active);
r.demote_to_passive().unwrap();
assert_eq!(r.state(), ObserverState::Passive);
r.begin_draining().unwrap();
assert_eq!(r.state(), ObserverState::Draining);
assert!(r.promote_to_active().is_err());
}
#[test]
fn cross_shell_fanout_routes_to_target_shell_only() {
let mut r = ProjectionRouter::new();
r.promote_to_active().unwrap();
r.register(Box::new(CrossShellActivityFanout::new()));
let target = sid(0x33);
let ev = make_cross_shell_event(0, 100, target);
let applied = r.dispatch(&ev, &ctx(100)).unwrap();
assert_eq!(applied, 1);
}
#[test]
fn dispatcher_skips_projection_with_no_matching_observer() {
let mut r = ProjectionRouter::new();
r.promote_to_active().unwrap();
r.register(Box::new(CrossShellActivityFanout::new()));
let other_event = EventRecord {
type_code: 0x0003_0F02, sequence: 0,
tick: Tick(1),
payload: Bytes::new(),
};
let applied = r.dispatch(&other_event, &ctx(1)).unwrap();
assert_eq!(applied, 0, "non-observed TypeCode must not hit the fanout");
}
#[test]
fn dispatcher_dedups_duplicate_sequence() {
let mut r = ProjectionRouter::new();
r.promote_to_active().unwrap();
r.register(Box::new(CrossShellActivityFanout::new()));
let target = sid(0x10);
let ev = make_cross_shell_event(0, 5, target);
r.dispatch(&ev, &ctx(5)).unwrap();
let applied_again = r.dispatch(&ev, &ctx(5)).unwrap();
assert_eq!(applied_again, 0, "duplicate sequence must no-op");
}
#[test]
fn dispatcher_rejects_gap() {
let mut r = ProjectionRouter::new();
r.promote_to_active().unwrap();
r.register(Box::new(CrossShellActivityFanout::new()));
let target = sid(0x10);
r.dispatch(&make_cross_shell_event(0, 5, target), &ctx(5))
.unwrap();
let err = r
.dispatch(&make_cross_shell_event(5, 6, target), &ctx(6))
.unwrap_err();
assert!(matches!(err, ProjectionError::SequenceGap { .. }));
}
#[test]
fn dispatcher_rejects_backward_sequence() {
let mut r = ProjectionRouter::new();
r.promote_to_active().unwrap();
r.register(Box::new(CrossShellActivityFanout::new()));
let target = sid(0x10);
r.dispatch(&make_cross_shell_event(2, 5, target), &ctx(5))
.unwrap();
let err = r
.dispatch(&make_cross_shell_event(1, 5, target), &ctx(5))
.unwrap_err();
assert!(matches!(err, ProjectionError::SequenceBackward { .. }));
}
#[test]
fn draining_rejects_dispatch() {
let mut r = ProjectionRouter::new();
r.begin_draining().unwrap();
let err = r
.dispatch(&make_cross_shell_event(0, 1, sid(0)), &ctx(1))
.unwrap_err();
assert!(matches!(
err,
ProjectionError::NotActive {
state: ObserverState::Draining
}
));
}
#[test]
fn passive_rejects_dispatch() {
let mut r = ProjectionRouter::new();
assert_eq!(r.state(), ObserverState::Passive);
let err = r
.dispatch(&make_cross_shell_event(0, 1, sid(0)), &ctx(1))
.unwrap_err();
assert!(matches!(
err,
ProjectionError::NotActive {
state: ObserverState::Passive
}
));
}
#[test]
fn demote_to_passive_blocks_subsequent_dispatch() {
let mut r = ProjectionRouter::new();
r.promote_to_active().unwrap();
r.register(Box::new(CrossShellActivityFanout::new()));
r.dispatch(&make_cross_shell_event(0, 5, sid(0x10)), &ctx(5))
.unwrap();
r.demote_to_passive().unwrap();
let err = r
.dispatch(&make_cross_shell_event(1, 6, sid(0x10)), &ctx(6))
.unwrap_err();
assert!(matches!(
err,
ProjectionError::NotActive {
state: ObserverState::Passive
}
));
}
#[test]
fn in_memory_store_roundtrip_actor() {
let mut store = InMemoryProjectionStore::new();
let row = ActorProjection {
schema_version: 1,
actor_id: ActorId::new(ent(42)),
profile: ActorProfile {
schema_version: 1,
shell_id: sid(0x01),
handle: BoundedString::<32>::new("alice").unwrap(),
kind: ActorKind::Human,
created_tick: Tick(1),
},
user_binding: None,
cursor: None,
};
store.upsert_actor(&row).unwrap();
let fetched = store.get_actor(ActorId::new(ent(42))).unwrap();
assert_eq!(fetched, row);
}
#[test]
fn cross_shell_fanout_preserves_shell_partition() {
let mut fanout = CrossShellActivityFanout::new();
let shell_a = sid(0xAA);
let shell_b = sid(0xBB);
fanout
.on_event(&make_cross_shell_event(0, 10, shell_a), &ctx(10))
.unwrap();
fanout
.on_event(&make_cross_shell_event(1, 11, shell_b), &ctx(11))
.unwrap();
assert_eq!(fanout.notifications_for(&shell_a).len(), 1);
assert_eq!(fanout.notifications_for(&shell_b).len(), 1);
assert_eq!(fanout.last_applied(), Some((1, Tick(11))));
}
#[test]
fn projection_cursor_roundtrip() {
let c = ProjectionCursor {
sequence: 5,
tick: Tick(10),
};
let bytes = postcard::to_stdvec(&c).unwrap();
let back: ProjectionCursor = postcard::from_bytes(&bytes).unwrap();
assert_eq!(c, back);
}
#[test]
fn auto_promote_policy_matrix() {
use crate::hf2_kms::health::{Channel, MultiChannelHealth, Status};
use crate::manifest::{
AuditSection, FrontendSection, ManifestSnapshot, RuntimeSection, ShellSection,
};
use core::time::Duration;
fn manifest_with(policy: &str) -> ManifestSnapshot {
ManifestSnapshot {
schema_version: 1,
shell: ShellSection {
shell_id: "test".to_string(),
display_name: "Test".to_string(),
},
runtime: RuntimeSection {
runtime_max: "0.15".to_string(),
runtime_current: "0.13".to_string(),
},
audit: AuditSection {
pii_cipher: "xchacha20-poly1305".to_string(),
dek_backend: "software-kek".to_string(),
kms_auto_promote: policy.to_string(),
signature_class: "ed25519".to_string(),
compliance_tier: 0,
},
frontend: FrontendSection::default(),
}
}
fn healthy_trio() -> MultiChannelHealth {
let mut h = MultiChannelHealth::new(&[
Channel::Default,
Channel::DnsOverHttps,
Channel::StaticIp,
]);
for c in [Channel::Default, Channel::DnsOverHttps, Channel::StaticIp] {
h.set_status(c, Status::Healthy);
}
h
}
fn degraded_trio() -> MultiChannelHealth {
let mut h = MultiChannelHealth::new(&[
Channel::Default,
Channel::DnsOverHttps,
Channel::StaticIp,
]);
h.set_status(Channel::Default, Status::Healthy);
h.set_status(Channel::DnsOverHttps, Status::Failing);
h.set_status(Channel::StaticIp, Status::Failing);
h
}
let r = ProjectionRouter::new();
let healthy = healthy_trio();
let degraded = degraded_trio();
assert_eq!(
r.evaluate_auto_promote(
&manifest_with("manual"),
Duration::from_secs(7200),
&healthy,
true,
),
Some(PromotionDecision::Wait),
);
assert_eq!(
r.evaluate_auto_promote(
&manifest_with("after_60min"),
Duration::from_secs(59 * 60),
&healthy,
false,
),
Some(PromotionDecision::Wait),
);
assert_eq!(
r.evaluate_auto_promote(
&manifest_with("after_60min"),
Duration::from_secs(60 * 60),
°raded,
false,
),
Some(PromotionDecision::Wait),
);
assert_eq!(
r.evaluate_auto_promote(
&manifest_with("after_60min"),
Duration::from_secs(60 * 60),
&healthy,
false,
),
Some(PromotionDecision::Promote),
);
assert_eq!(
r.evaluate_auto_promote(
&manifest_with("threshold_hsm"),
Duration::from_secs(60 * 60),
°raded,
false,
),
Some(PromotionDecision::Wait),
);
assert_eq!(
r.evaluate_auto_promote(
&manifest_with("threshold_hsm"),
Duration::from_secs(0),
°raded,
true,
),
Some(PromotionDecision::Promote),
);
assert!(r
.evaluate_auto_promote(
&manifest_with("unknown"),
Duration::from_secs(86_400),
&healthy,
true,
)
.is_none());
}
}