use std::collections::VecDeque;
use crate::ids::{CommandId, ComponentRef, OpRef, PeerId};
#[derive(Clone, Debug)]
pub enum NodeEvent {
Infra(InfraEvent),
App(AppEvent),
}
impl NodeEvent {
pub fn kind(&self) -> &str {
match self {
NodeEvent::Infra(InfraEvent::WireResponseLanded { .. }) => "WireResponseLanded",
NodeEvent::Infra(InfraEvent::OpFailure { .. }) => "OpFailure",
NodeEvent::Infra(InfraEvent::WireDecodeFailure { .. }) => "WireDecodeFailure",
NodeEvent::Infra(InfraEvent::WireReceiveError { .. }) => "WireReceiveError",
NodeEvent::Infra(InfraEvent::AppIngressError { .. }) => "AppIngressError",
NodeEvent::Infra(InfraEvent::BusOverflow { .. }) => "BusOverflow",
NodeEvent::Infra(InfraEvent::PeerResolveFailure { .. }) => "PeerResolveFailure",
NodeEvent::Infra(InfraEvent::PeerSuspect { .. }) => "PeerSuspect",
NodeEvent::Infra(InfraEvent::PeerDown { .. }) => "PeerDown",
NodeEvent::Infra(InfraEvent::PeerLive { .. }) => "PeerLive",
NodeEvent::Infra(InfraEvent::BackoffNoticeSent { .. }) => "BackoffNoticeSent",
NodeEvent::Infra(InfraEvent::SilentDropActive { .. }) => "SilentDropActive",
NodeEvent::App(AppEvent::Emit { name, .. }) => name.as_str(),
NodeEvent::App(AppEvent::Notify { name }) => name.as_str(),
}
}
}
#[derive(Clone, Debug)]
pub enum InfraEvent {
WireResponseLanded {
cmd_id: CommandId,
},
OpFailure {
op_ref: OpRef,
error: OpError,
},
WireDecodeFailure {
hash: u64,
payload_size: usize,
detail: String,
},
WireReceiveError {
src_peer: Option<PeerId>,
fill_index: u32,
actual_hash: u64,
payload_size: usize,
kind: WireReceiveErrorKind,
},
AppIngressError {
source: AppIngressSource,
byte_count: usize,
kind: AppIngressErrorKind,
},
BusOverflow {
count: usize,
},
PeerResolveFailure {
peer: Option<PeerId>,
op_ref: OpRef,
},
PeerSuspect {
site: crate::ids::NodeSiteId,
phi: f64,
},
PeerDown {
site: crate::ids::NodeSiteId,
phi: f64,
},
PeerLive {
site: crate::ids::NodeSiteId,
},
BackoffNoticeSent {
peer: PeerId,
cause: crate::framework::BackoffCause,
min_backoff_ns: u64,
},
SilentDropActive {
peer: PeerId,
},
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum WireReceiveErrorKind {
UnknownTypeHash,
TypeMismatch {
expected_hash: u64,
},
DecodeFailed {
error_summary: String,
},
AllocationFailed {
byte_count: usize,
reason: AllocFailReason,
},
BudgetExceeded {
byte_count: usize,
budget_remaining: usize,
},
BackendMaterializeFailed {
backend_ref: ComponentRef,
backend_error_summary: String,
},
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum AllocFailReason {
HeapExhausted,
PerItemCapExceeded {
cap: usize,
},
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum AppIngressSource {
AppEvent {
module: String,
input: String,
},
Invoke {
module: String,
input_count: usize,
},
Completion {
command: CommandId,
},
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum AppIngressErrorKind {
AllocationFailed {
reason: AllocFailReason,
},
BudgetExceeded {
budget_remaining: usize,
},
PerItemCapExceeded {
cap: usize,
},
}
#[derive(Clone, Debug)]
pub enum AppEvent {
Emit {
name: String,
value_bytes: Vec<u8>,
},
Notify {
name: String,
},
}
const RESERVED_PREFIX_BB: &str = "bb.";
const RESERVED_PREFIX_FRAMEWORK: &str = "ai.bytesandbrains.";
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum AppEventError {
ReservedPrefix {
topic: String,
prefix: &'static str,
},
}
impl std::fmt::Display for AppEventError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ReservedPrefix { topic, prefix } => write!(
f,
"AppEvent topic `{topic}` collides with the framework-reserved prefix `{prefix}`",
),
}
}
}
impl std::error::Error for AppEventError {}
impl AppEvent {
pub fn emit(name: impl Into<String>, value_bytes: Vec<u8>) -> Result<Self, AppEventError> {
let name = name.into();
check_reserved(&name)?;
Ok(Self::Emit { name, value_bytes })
}
pub fn notify(name: impl Into<String>) -> Result<Self, AppEventError> {
let name = name.into();
check_reserved(&name)?;
Ok(Self::Notify { name })
}
}
fn check_reserved(topic: &str) -> Result<(), AppEventError> {
for prefix in [RESERVED_PREFIX_BB, RESERVED_PREFIX_FRAMEWORK] {
if topic.starts_with(prefix) {
return Err(AppEventError::ReservedPrefix {
topic: topic.to_string(),
prefix,
});
}
}
Ok(())
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum OpErrorKind {
TypeMismatch,
MissingSlot,
NotRegistered,
ExecutionFailed,
RemoteFailed,
Timeout,
BadInput,
Cooldown,
Other,
}
impl Default for OpErrorKind {
fn default() -> Self {
Self::Other
}
}
#[derive(Clone, Debug, Default)]
pub struct OpError {
pub kind: OpErrorKind,
pub reason: &'static str,
pub detail: String,
}
impl std::fmt::Display for OpError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.reason.is_empty() {
write!(f, "op error[{:?}]: {}", self.kind, self.detail)
} else {
write!(
f,
"op error[{:?} reason={}]: {}",
self.kind, self.reason, self.detail,
)
}
}
}
impl std::error::Error for OpError {}
#[derive(Default)]
pub struct TypedBus {
queue: VecDeque<NodeEvent>,
cap: Option<usize>,
dropped_since_last_drain: usize,
}
impl TypedBus {
pub fn new() -> Self {
Self::default()
}
pub fn with_cap(cap: Option<usize>) -> Self {
Self {
queue: VecDeque::new(),
cap,
dropped_since_last_drain: 0,
}
}
pub fn set_cap(&mut self, cap: Option<usize>) {
self.cap = cap;
}
pub fn publish(&mut self, event: NodeEvent) {
if let Some(cap) = self.cap {
while self.queue.len() >= cap {
self.queue.pop_front();
self.dropped_since_last_drain += 1;
}
}
self.queue.push_back(event);
}
pub fn drain(&mut self) -> Vec<NodeEvent> {
self.queue.drain(..).collect()
}
pub fn take_dropped_count(&mut self) -> usize {
std::mem::take(&mut self.dropped_since_last_drain)
}
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
pub fn len(&self) -> usize {
self.queue.len()
}
}