use std::time::Duration;
use thiserror::Error;
use crate::identity::TraceId;
use super::events::{
ChannelAuditEvent, ModerationAuditEvent, SubstrateAuditEvent, UserAuditEvent,
};
use super::sinks::{AuditError, SinkKind};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct CompositeOpId([u8; 16]);
impl CompositeOpId {
#[must_use]
pub const fn from_bytes(bytes: [u8; 16]) -> Self {
CompositeOpId(bytes)
}
#[must_use]
pub const fn as_bytes(&self) -> &[u8; 16] {
&self.0
}
}
pub const TRACKER_SHARDS: usize = 16;
pub const TRACKER_GRACE_WINDOW_DEFAULT: Duration = Duration::from_millis(100);
pub const TRACKER_GRACE_WINDOW_MAX: Duration = Duration::from_secs(1);
#[derive(Debug)]
pub struct CompositeAuditScope {
trace_id: TraceId,
composite_op_id: CompositeOpId,
queued_events: std::sync::Mutex<Vec<QueuedEvent>>,
sinks_committed: std::sync::Mutex<smallvec::SmallVec<[SinkKind; 4]>>,
}
#[derive(Debug)]
enum QueuedEvent {
User(UserAuditEvent),
Channel(ChannelAuditEvent),
Substrate(SubstrateAuditEvent),
Moderation(ModerationAuditEvent),
}
impl QueuedEvent {
fn class(&self) -> SinkKind {
match self {
QueuedEvent::User(_) => SinkKind::User,
QueuedEvent::Channel(_) => SinkKind::Channel,
QueuedEvent::Substrate(_) => SinkKind::Substrate,
QueuedEvent::Moderation(_) => SinkKind::Moderation,
}
}
}
const COMMIT_PRIORITY_ORDER: &[SinkKind] = &[
SinkKind::Substrate,
SinkKind::Moderation,
SinkKind::User,
SinkKind::Channel,
];
impl CompositeAuditScope {
#[must_use]
pub(crate) fn new_internal(trace_id: TraceId, composite_op_id: CompositeOpId) -> Self {
CompositeAuditScope {
trace_id,
composite_op_id,
queued_events: std::sync::Mutex::new(Vec::new()),
sinks_committed: std::sync::Mutex::new(smallvec::SmallVec::new()),
}
}
#[must_use]
pub fn trace_id(&self) -> TraceId {
self.trace_id
}
#[must_use]
pub fn composite_op_id(&self) -> CompositeOpId {
self.composite_op_id
}
pub fn emit_user(&self, event: UserAuditEvent) {
self.queued_events
.lock()
.expect("composite scope queue poisoned")
.push(QueuedEvent::User(event));
}
pub fn emit_channel(&self, event: ChannelAuditEvent) {
self.queued_events
.lock()
.expect("composite scope queue poisoned")
.push(QueuedEvent::Channel(event));
}
pub fn emit_substrate(&self, event: SubstrateAuditEvent) {
self.queued_events
.lock()
.expect("composite scope queue poisoned")
.push(QueuedEvent::Substrate(event));
}
pub fn emit_moderation(&self, event: ModerationAuditEvent) {
self.queued_events
.lock()
.expect("composite scope queue poisoned")
.push(QueuedEvent::Moderation(event));
}
pub(in crate::audit::composite) fn drain_queued_events(&self) -> Vec<QueuedEvent> {
let mut events = self
.queued_events
.lock()
.expect("composite scope queue poisoned")
.drain(..)
.collect::<Vec<_>>();
events.sort_by_key(|e| {
COMMIT_PRIORITY_ORDER
.iter()
.position(|c| *c == e.class())
.unwrap_or(usize::MAX)
});
events
}
pub(in crate::audit::composite) fn record_committed(&self, class: SinkKind) {
let mut g = self
.sinks_committed
.lock()
.expect("composite scope committed-set poisoned");
if !g.contains(&class) {
g.push(class);
}
}
pub(in crate::audit::composite) fn committed_snapshot(&self) -> smallvec::SmallVec<[SinkKind; 4]> {
self.sinks_committed
.lock()
.expect("composite scope committed-set poisoned")
.clone()
}
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum CompositeAuditError {
#[error("composite commit failed at {class:?} sink: {source}")]
SinkCommitFailed {
class: SinkKind,
source: AuditError,
},
#[error("composite rollback dispatch failed at {class:?} sink: {source}")]
RollbackDispatchFailed {
class: SinkKind,
source: AuditError,
},
#[error("composite inconsistency unrecoverable (fallback sink panicked)")]
InconsistencyUnrecoverable,
#[error("composite tracker full")]
TrackerFull,
}
#[derive(Debug, Clone)]
pub struct CompositeRollbackMarker {
pub(in crate::audit::composite) trace_id: TraceId,
pub(in crate::audit::composite) composite_op_id: CompositeOpId,
pub(in crate::audit::composite) failing_sink: SinkKind,
}
impl CompositeRollbackMarker {
#[must_use]
pub(in crate::audit::composite) fn new_internal(
trace_id: TraceId,
composite_op_id: CompositeOpId,
failing_sink: SinkKind,
) -> Self {
CompositeRollbackMarker {
trace_id,
composite_op_id,
failing_sink,
}
}
}
pub(in crate::audit) trait SinkDispatcher {
fn dispatch(
&self,
sink: SinkKind,
event: CompositeRollbackMarker,
) -> Result<(), AuditError>;
}
pub(in crate::audit) struct AuditSinksDispatcher<'a, 'b> {
pub(in crate::audit) sinks: &'a crate::ingress::AuditSinks<'b>,
}
impl<'a, 'b> SinkDispatcher for AuditSinksDispatcher<'a, 'b> {
fn dispatch(
&self,
sink: SinkKind,
marker: CompositeRollbackMarker,
) -> Result<(), AuditError> {
let at = std::time::SystemTime::now();
match sink {
SinkKind::User => self.sinks.user.record(UserAuditEvent::CompositeRollbackMarker {
trace_id: marker.trace_id,
composite_op_id: marker.composite_op_id,
failing_sink: marker.failing_sink,
at,
}),
SinkKind::Channel => {
self.sinks.channel.record(ChannelAuditEvent::CompositeRollbackMarker {
trace_id: marker.trace_id,
composite_op_id: marker.composite_op_id,
failing_sink: marker.failing_sink,
at,
})
}
SinkKind::Substrate => {
self.sinks.substrate.record(SubstrateAuditEvent::CompositeRollbackMarker {
trace_id: marker.trace_id,
composite_op_id: marker.composite_op_id,
failing_sink: marker.failing_sink,
at,
})
}
SinkKind::Moderation => {
self.sinks.moderation.record(ModerationAuditEvent::CompositeRollbackMarker {
trace_id: marker.trace_id,
composite_op_id: marker.composite_op_id,
failing_sink: marker.failing_sink,
at,
})
}
}
}
}
pub(in crate::audit::composite) fn emit_rollback_marker(
scope: &CompositeAuditScope,
sink_dispatch: &dyn SinkDispatcher,
target_sink: SinkKind,
failing_sink: SinkKind,
) -> Result<(), AuditError> {
let marker = CompositeRollbackMarker::new_internal(
scope.trace_id,
scope.composite_op_id,
failing_sink,
);
sink_dispatch.dispatch(target_sink, marker)
}
pub async fn composite_audit<F, R, E>(
trace_id: TraceId,
sinks: &crate::ingress::AuditSinks<'_>,
op: F,
) -> Result<R, E>
where
F: AsyncFnOnce(&CompositeAuditScope) -> Result<R, E>,
E: From<CompositeAuditError>,
{
let composite_op_id = generate_composite_op_id();
let scope = CompositeAuditScope::new_internal(trace_id, composite_op_id);
let op_result = op(&scope).await;
let returned = match op_result {
Ok(r) => r,
Err(e) => {
return Err(e);
}
};
let queued = scope.drain_queued_events();
let dispatcher = AuditSinksDispatcher { sinks };
for event in queued {
let class = event.class();
let commit_result = match event {
QueuedEvent::User(e) => sinks.user.record(e),
QueuedEvent::Channel(e) => sinks.channel.record(e),
QueuedEvent::Substrate(e) => sinks.substrate.record(e),
QueuedEvent::Moderation(e) => sinks.moderation.record(e),
};
match commit_result {
Ok(()) => scope.record_committed(class),
Err(commit_err) => {
handle_rollback(&scope, &dispatcher, sinks, class, commit_err).await
.map_err(E::from)?;
unreachable!("handle_rollback always returns an error");
}
}
}
Ok(returned)
}
fn generate_composite_op_id() -> CompositeOpId {
let mut bytes = [0u8; 16];
getrandom::getrandom(&mut bytes)
.expect("OS CSPRNG must be available for composite_op_id generation");
CompositeOpId::from_bytes(bytes)
}
async fn handle_rollback(
scope: &CompositeAuditScope,
dispatcher: &AuditSinksDispatcher<'_, '_>,
sinks: &crate::ingress::AuditSinks<'_>,
failing_class: SinkKind,
originating_err: AuditError,
) -> Result<(), CompositeAuditError> {
let committed = scope.committed_snapshot();
let rollback_targets: Vec<SinkKind> = COMMIT_PRIORITY_ORDER
.iter()
.rev()
.filter(|c| committed.contains(c))
.copied()
.collect();
for target in rollback_targets {
if let Err(rollback_err) =
emit_rollback_marker(scope, dispatcher, target, failing_class)
{
let escalation_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
sinks.fallback.record_composite_failure(
scope.trace_id,
scope.composite_op_id,
&committed[..],
&[failing_class],
std::time::SystemTime::now(),
);
}));
return match escalation_result {
Ok(()) => Err(CompositeAuditError::RollbackDispatchFailed {
class: target,
source: rollback_err,
}),
Err(_) => Err(CompositeAuditError::InconsistencyUnrecoverable),
};
}
}
Err(CompositeAuditError::SinkCommitFailed {
class: failing_class,
source: originating_err,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn tracker_grace_window_default_pinned() {
assert_eq!(TRACKER_GRACE_WINDOW_DEFAULT, Duration::from_millis(100));
}
#[test]
fn tracker_grace_window_max_pinned_at_1s() {
assert_eq!(TRACKER_GRACE_WINDOW_MAX, Duration::from_secs(1));
assert!(TRACKER_GRACE_WINDOW_MAX >= TRACKER_GRACE_WINDOW_DEFAULT);
}
#[test]
fn tracker_shards_pinned_at_16() {
assert_eq!(TRACKER_SHARDS, 16);
}
use crate::audit::bounded_string::BoundedString;
use crate::audit::events::{
ChannelAuditEvent, FallbackAuditEvent, ModerationAuditEvent, ModeratorRationale,
SubstrateAuditEvent, UserAuditEvent, MAX_RATIONALE_LEN,
};
use crate::audit::sinks::{
ChannelAuditSink, FallbackAuditSink, ModerationAuditSink, SubstrateAuditSink,
UserAuditSink,
};
use crate::authority::predicate::BindOutcomeRepr;
use crate::authority::ModerationCaseId;
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::sync::Mutex as StdMutex;
struct MockSink<E: Clone + Send + Sync + 'static> {
captured: StdMutex<Vec<E>>,
call_count: AtomicUsize,
fail_on_call: Option<usize>,
}
impl<E: Clone + Send + Sync + 'static> MockSink<E> {
fn new(fail_on_call: Option<usize>) -> Self {
MockSink {
captured: StdMutex::new(Vec::new()),
call_count: AtomicUsize::new(0),
fail_on_call,
}
}
fn captured(&self) -> Vec<E> {
self.captured.lock().unwrap().clone()
}
fn record_inner(&self, event: E) -> Result<(), AuditError> {
let n = self.call_count.fetch_add(1, AtomicOrdering::SeqCst) + 1;
if Some(n) == self.fail_on_call {
return Err(AuditError::Unavailable);
}
self.captured.lock().unwrap().push(event);
Ok(())
}
}
impl UserAuditSink for MockSink<UserAuditEvent> {
fn record(&self, event: UserAuditEvent) -> Result<(), AuditError> {
self.record_inner(event)
}
}
impl ChannelAuditSink for MockSink<ChannelAuditEvent> {
fn record(&self, event: ChannelAuditEvent) -> Result<(), AuditError> {
self.record_inner(event)
}
}
impl SubstrateAuditSink for MockSink<SubstrateAuditEvent> {
fn record(&self, event: SubstrateAuditEvent) -> Result<(), AuditError> {
self.record_inner(event)
}
}
impl ModerationAuditSink for MockSink<ModerationAuditEvent> {
fn record(&self, event: ModerationAuditEvent) -> Result<(), AuditError> {
self.record_inner(event)
}
}
type FallbackCapture = (TraceId, CompositeOpId, Vec<SinkKind>, Vec<SinkKind>);
struct MockFallback {
captured: StdMutex<Vec<FallbackCapture>>,
panic_on_call: bool,
}
impl MockFallback {
fn new(panic_on_call: bool) -> Self {
MockFallback {
captured: StdMutex::new(Vec::new()),
panic_on_call,
}
}
fn captured_count(&self) -> usize {
self.captured.lock().unwrap().len()
}
}
impl FallbackAuditSink for MockFallback {
fn record_panic(
&self,
_sink: SinkKind,
_trace_id: TraceId,
_capability: crate::authority::capability::CapabilityKind,
_at: std::time::SystemTime,
) {
}
fn record_composite_failure(
&self,
trace_id: TraceId,
composite_op_id: CompositeOpId,
sinks_committed: &[SinkKind],
sinks_failed: &[SinkKind],
_at: std::time::SystemTime,
) {
if self.panic_on_call {
panic!("MockFallback configured to panic on record_composite_failure");
}
self.captured.lock().unwrap().push((
trace_id,
composite_op_id,
sinks_committed.to_vec(),
sinks_failed.to_vec(),
));
}
fn record_event(&self, _event: FallbackAuditEvent) {}
}
fn sample_did() -> crate::proto::Did {
crate::proto::Did::new("did:plc:phase7btest").unwrap()
}
fn sample_target_repr() -> crate::target::TargetRepresentation {
crate::target::TargetRepresentation::structural_only(
crate::target::StructuralRepresentation::Resource {
did: sample_did(),
nsid: crate::Nsid::new("tools.kryphocron.feed.postPrivate").unwrap(),
},
)
}
fn sample_service_identity() -> crate::identity::ServiceIdentity {
crate::identity::ServiceIdentity::new_internal(
sample_did(),
crate::identity::KeyId::from_bytes([0u8; 32]),
crate::identity::PublicKey {
algorithm: crate::identity::SignatureAlgorithm::Ed25519,
bytes: [0u8; 32],
},
None,
)
}
fn sample_user_event() -> UserAuditEvent {
UserAuditEvent::CapabilityBound {
trace_id: TraceId::from_bytes([0xA1; 16]),
requester: sample_did(),
subject_repr: sample_target_repr(),
capability: crate::authority::capability::CapabilityKind::ViewPrivate,
outcome: BindOutcomeRepr::Success,
attribution: crate::ingress::AttributionChain::empty(),
at: std::time::SystemTime::UNIX_EPOCH,
}
}
fn sample_channel_event() -> ChannelAuditEvent {
ChannelAuditEvent::ChannelClosed {
trace_id: TraceId::from_bytes([0xC1; 16]),
peer: sample_service_identity(),
session_digest: crate::identity::SessionDigest::from_bytes([0u8; 32]),
cause: crate::audit::events::ChannelCloseCause::CleanClose,
at: std::time::SystemTime::UNIX_EPOCH,
}
}
fn sample_substrate_event() -> SubstrateAuditEvent {
SubstrateAuditEvent::ScopeBound {
trace_id: TraceId::from_bytes([0x51; 16]),
service: sample_service_identity(),
scope_repr: sample_target_repr(),
capability: crate::authority::capability::CapabilityKind::ScanShard,
outcome: BindOutcomeRepr::Success,
payload_completeness: crate::audit::PayloadCompleteness::PartialV01,
at: std::time::SystemTime::UNIX_EPOCH,
}
}
fn sample_moderation_event() -> ModerationAuditEvent {
ModerationAuditEvent::ModeratorInspected {
trace_id: TraceId::from_bytes([0xD1; 16]),
moderator: sample_did(),
case: ModerationCaseId::from_bytes([0u8; 16]),
target_repr: sample_target_repr(),
rationale: ModeratorRationale::Declared(
BoundedString::<MAX_RATIONALE_LEN>::new("test").unwrap(),
),
payload_completeness: crate::audit::PayloadCompleteness::PartialV01,
at: std::time::SystemTime::UNIX_EPOCH,
}
}
#[derive(Debug)]
enum TestError {
Composite(CompositeAuditError),
OpSpecific(&'static str),
}
impl From<CompositeAuditError> for TestError {
fn from(e: CompositeAuditError) -> Self {
TestError::Composite(e)
}
}
fn build_sinks<'a>(
user: &'a MockSink<UserAuditEvent>,
channel: &'a MockSink<ChannelAuditEvent>,
substrate: &'a MockSink<SubstrateAuditEvent>,
moderation: &'a MockSink<ModerationAuditEvent>,
fallback: &'a MockFallback,
) -> crate::ingress::AuditSinks<'a> {
static NO_INSPECTION: crate::authority::NoInspectionNotifications =
crate::authority::NoInspectionNotifications;
static NO_CORRELATION_KEY: crate::identity::CorrelationKey =
crate::identity::CorrelationKey::from_bytes([0u8; 32]);
crate::ingress::AuditSinks {
user,
channel,
substrate,
moderation,
fallback,
inspection_queue: &NO_INSPECTION,
correlation_key: &NO_CORRELATION_KEY,
}
}
#[tokio::test]
async fn happy_path_commits_all_queued_events() {
let user = MockSink::<UserAuditEvent>::new(None);
let channel = MockSink::<ChannelAuditEvent>::new(None);
let substrate = MockSink::<SubstrateAuditEvent>::new(None);
let moderation = MockSink::<ModerationAuditEvent>::new(None);
let fallback = MockFallback::new(false);
let sinks = build_sinks(&user, &channel, &substrate, &moderation, &fallback);
let result: Result<u32, TestError> = composite_audit(
TraceId::from_bytes([0xFF; 16]),
&sinks,
async |scope| {
scope.emit_user(sample_user_event());
scope.emit_channel(sample_channel_event());
scope.emit_substrate(sample_substrate_event());
scope.emit_moderation(sample_moderation_event());
Ok(42)
},
)
.await;
assert!(matches!(result, Ok(42)));
assert_eq!(user.captured().len(), 1);
assert_eq!(channel.captured().len(), 1);
assert_eq!(substrate.captured().len(), 1);
assert_eq!(moderation.captured().len(), 1);
assert_eq!(fallback.captured_count(), 0);
}
#[tokio::test]
async fn op_failure_returns_op_error_unchanged_no_emit() {
let user = MockSink::<UserAuditEvent>::new(None);
let channel = MockSink::<ChannelAuditEvent>::new(None);
let substrate = MockSink::<SubstrateAuditEvent>::new(None);
let moderation = MockSink::<ModerationAuditEvent>::new(None);
let fallback = MockFallback::new(false);
let sinks = build_sinks(&user, &channel, &substrate, &moderation, &fallback);
let result: Result<u32, TestError> = composite_audit(
TraceId::from_bytes([0; 16]),
&sinks,
async |scope| {
scope.emit_user(sample_user_event());
Err(TestError::OpSpecific("op rejected"))
},
)
.await;
assert!(matches!(result, Err(TestError::OpSpecific("op rejected"))));
assert!(user.captured().is_empty());
assert!(channel.captured().is_empty());
assert_eq!(fallback.captured_count(), 0);
}
#[tokio::test]
async fn channel_commit_failure_rolls_back_user() {
let user = MockSink::<UserAuditEvent>::new(None);
let channel = MockSink::<ChannelAuditEvent>::new(Some(1));
let substrate = MockSink::<SubstrateAuditEvent>::new(None);
let moderation = MockSink::<ModerationAuditEvent>::new(None);
let fallback = MockFallback::new(false);
let sinks = build_sinks(&user, &channel, &substrate, &moderation, &fallback);
let result: Result<(), TestError> = composite_audit(
TraceId::from_bytes([0x33; 16]),
&sinks,
async |scope| {
scope.emit_user(sample_user_event());
scope.emit_channel(sample_channel_event());
Ok(())
},
)
.await;
assert!(matches!(
result,
Err(TestError::Composite(CompositeAuditError::SinkCommitFailed {
class: SinkKind::Channel,
..
}))
));
let user_events = user.captured();
assert_eq!(user_events.len(), 2, "user sink should have op event + rollback marker");
assert!(matches!(
user_events[1],
UserAuditEvent::CompositeRollbackMarker { failing_sink: SinkKind::Channel, .. }
));
assert_eq!(fallback.captured_count(), 0);
}
#[tokio::test]
async fn channel_failure_after_three_commits_rolls_back_three() {
let user = MockSink::<UserAuditEvent>::new(None);
let channel = MockSink::<ChannelAuditEvent>::new(Some(1));
let substrate = MockSink::<SubstrateAuditEvent>::new(None);
let moderation = MockSink::<ModerationAuditEvent>::new(None);
let fallback = MockFallback::new(false);
let sinks = build_sinks(&user, &channel, &substrate, &moderation, &fallback);
let result: Result<(), TestError> = composite_audit(
TraceId::from_bytes([0x44; 16]),
&sinks,
async |scope| {
scope.emit_user(sample_user_event());
scope.emit_channel(sample_channel_event());
scope.emit_substrate(sample_substrate_event());
scope.emit_moderation(sample_moderation_event());
Ok(())
},
)
.await;
assert!(matches!(
result,
Err(TestError::Composite(CompositeAuditError::SinkCommitFailed {
class: SinkKind::Channel,
..
}))
));
assert_eq!(user.captured().len(), 2);
assert_eq!(substrate.captured().len(), 2);
assert_eq!(moderation.captured().len(), 2);
assert_eq!(channel.captured().len(), 0); assert_eq!(fallback.captured_count(), 0);
}
#[tokio::test]
async fn rollback_dispatch_failure_escalates_to_fallback() {
let user = MockSink::<UserAuditEvent>::new(Some(2));
let channel = MockSink::<ChannelAuditEvent>::new(Some(1));
let substrate = MockSink::<SubstrateAuditEvent>::new(None);
let moderation = MockSink::<ModerationAuditEvent>::new(None);
let fallback = MockFallback::new(false);
let sinks = build_sinks(&user, &channel, &substrate, &moderation, &fallback);
let result: Result<(), TestError> = composite_audit(
TraceId::from_bytes([0x55; 16]),
&sinks,
async |scope| {
scope.emit_user(sample_user_event());
scope.emit_channel(sample_channel_event());
Ok(())
},
)
.await;
assert!(matches!(
result,
Err(TestError::Composite(CompositeAuditError::RollbackDispatchFailed {
class: SinkKind::User,
..
}))
));
assert_eq!(fallback.captured_count(), 1);
}
#[tokio::test]
async fn fallback_panic_returns_inconsistency_unrecoverable() {
let user = MockSink::<UserAuditEvent>::new(Some(2));
let channel = MockSink::<ChannelAuditEvent>::new(Some(1));
let substrate = MockSink::<SubstrateAuditEvent>::new(None);
let moderation = MockSink::<ModerationAuditEvent>::new(None);
let fallback = MockFallback::new(true); let sinks = build_sinks(&user, &channel, &substrate, &moderation, &fallback);
let result: Result<(), TestError> = composite_audit(
TraceId::from_bytes([0x66; 16]),
&sinks,
async |scope| {
scope.emit_user(sample_user_event());
scope.emit_channel(sample_channel_event());
Ok(())
},
)
.await;
assert!(matches!(
result,
Err(TestError::Composite(CompositeAuditError::InconsistencyUnrecoverable))
));
}
#[tokio::test]
async fn class_priority_ordering_substrate_first_channel_last() {
let user = MockSink::<UserAuditEvent>::new(Some(1));
let channel = MockSink::<ChannelAuditEvent>::new(None);
let substrate = MockSink::<SubstrateAuditEvent>::new(None);
let moderation = MockSink::<ModerationAuditEvent>::new(None);
let fallback = MockFallback::new(false);
let sinks = build_sinks(&user, &channel, &substrate, &moderation, &fallback);
let _result: Result<(), TestError> = composite_audit(
TraceId::from_bytes([0x77; 16]),
&sinks,
async |scope| {
scope.emit_channel(sample_channel_event());
scope.emit_user(sample_user_event());
scope.emit_substrate(sample_substrate_event());
scope.emit_moderation(sample_moderation_event());
Ok(())
},
)
.await;
assert_eq!(substrate.captured().len(), 2, "substrate: op event + rollback marker");
assert_eq!(moderation.captured().len(), 2, "moderation: op event + rollback marker");
assert_eq!(user.captured().len(), 0, "user: failed on op event; not in committed-set; no rollback marker");
assert_eq!(channel.captured().len(), 0, "channel queued last in priority order; not reached");
}
}