use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use ed25519_dalek::SigningKey;
use rand_core::OsRng;
use crate::buffer::PayloadStorage;
use crate::config::OmnimeshMode;
use crate::envelope::{Did, EnvelopeHeader, MessageId, PayloadType, Priority, SignedEnvelope};
use crate::payload::{EnvelopePayload, decode_payload, encode_payload};
use crate::runtime::RoutingTable;
use crate::runtime::security::SecurityLayer;
use crate::runtime::transport::TransportLayer;
use crate::runtime::transport::interface::DEFAULT_PAYLOAD_CAPACITY;
#[derive(Debug, Clone)]
pub struct ReceivedMessage {
pub sender: Did,
pub payload: EnvelopePayload,
pub received_at_us: u64,
}
#[derive(Debug, Clone)]
pub struct ClientConfig {
pub mode: OmnimeshMode,
pub receive_buffer_capacity: usize,
pub listen_port: Option<u16>,
}
impl Default for ClientConfig {
fn default() -> Self {
Self {
mode: OmnimeshMode::development(),
receive_buffer_capacity: 1024,
listen_port: None,
}
}
}
impl ClientConfig {
pub fn development() -> Self {
Self {
mode: OmnimeshMode::development(),
..Default::default()
}
}
pub fn lightweight() -> Self {
Self {
mode: OmnimeshMode::lightweight(),
..Default::default()
}
}
pub fn lightweight_on_port(port: u16) -> Self {
Self {
mode: OmnimeshMode::lightweight(),
listen_port: Some(port),
..Default::default()
}
}
pub fn production() -> Self {
Self {
mode: OmnimeshMode::production(),
..Default::default()
}
}
pub fn production_on_port(port: u16) -> Self {
Self {
mode: OmnimeshMode::production(),
listen_port: Some(port),
..Default::default()
}
}
}
#[derive(Default)]
pub struct ClientBuilder {
config: Option<ClientConfig>,
signing_key: Option<SigningKey>,
}
impl ClientBuilder {
pub fn with_config(mut self, config: ClientConfig) -> Self {
self.config = Some(config);
self
}
pub fn with_signing_key(mut self, key: SigningKey) -> Self {
self.signing_key = Some(key);
self
}
pub fn build(self) -> Result<OmnimeshClient, String> {
let config = self.config.unwrap_or_default();
let signing_key = self
.signing_key
.unwrap_or_else(|| SigningKey::generate(&mut OsRng));
OmnimeshClient::new(config, signing_key)
}
}
#[derive(Clone)]
pub struct OmnimeshClient {
pub did: Did,
signing_key: Arc<SigningKey>,
transport: Arc<TransportLayer>,
security: Arc<SecurityLayer>,
routing: Arc<RoutingTable>,
inbox: Arc<Mutex<VecDeque<ReceivedMessage>>>,
inbox_notify: Arc<Condvar>,
receive_buffer_capacity: usize,
seq: Arc<Mutex<u64>>,
shutdown: Arc<AtomicBool>,
pub(crate) messages_dropped: Arc<AtomicU64>,
pub(crate) messages_received: Arc<AtomicU64>,
pub(crate) messages_sent: Arc<AtomicU64>,
}
impl OmnimeshClient {
pub fn builder() -> ClientBuilder {
ClientBuilder::default()
}
fn new(config: ClientConfig, signing_key: SigningKey) -> Result<Self, String> {
let did = Did::new(signing_key.verifying_key().to_bytes());
let routing = Arc::new(RoutingTable::new());
let transport = if let Some(port) = config.listen_port {
use crate::runtime::transport::TransportConfig;
let listen_addr = format!("127.0.0.1:{}", port).parse().unwrap();
let transport_config = TransportConfig::new(
listen_addr,
listen_addr, format!("127.0.0.1:{}", port + 443).parse().unwrap(),
);
Arc::new(TransportLayer::with_config_and_routing(
&config.mode,
transport_config,
routing.clone(),
)?)
} else {
Arc::new(TransportLayer::new_with_did(&config.mode, did)?)
};
let security = Arc::new(SecurityLayer::new(&config.mode, None));
let inbox = Arc::new(Mutex::new(VecDeque::new()));
let inbox_notify = Arc::new(Condvar::new());
let shutdown = Arc::new(AtomicBool::new(false));
let client = Self {
did,
signing_key: Arc::new(signing_key),
transport,
security,
routing,
inbox,
inbox_notify,
receive_buffer_capacity: config.receive_buffer_capacity,
seq: Arc::new(Mutex::new(0)),
shutdown,
messages_dropped: Arc::new(AtomicU64::new(0)),
messages_received: Arc::new(AtomicU64::new(0)),
messages_sent: Arc::new(AtomicU64::new(0)),
};
client.start_poller();
Ok(client)
}
pub fn register_peer(&self, did: Did, addr: &str) -> Result<(), String> {
let socket_addr = addr
.parse()
.map_err(|e| format!("Invalid address '{}': {}", addr, e))?;
self.routing.update_route(did, socket_addr);
Ok(())
}
pub fn send(&self, recipient: Did, payload: EnvelopePayload) -> Result<(), String> {
if self.shutdown.load(Ordering::Relaxed) {
return Err("Client has been shut down".to_string());
}
let encoded = encode_payload(&payload);
if encoded.is_empty() {
return Err("Cannot send empty payload".to_string());
}
if encoded.len() > DEFAULT_PAYLOAD_CAPACITY {
return Err(format!(
"Payload too large: {} bytes (max {})",
encoded.len(),
DEFAULT_PAYLOAD_CAPACITY
));
}
let seq = {
let mut s = self.seq.lock().map_err(|e| e.to_string())?;
*s += 1;
*s
};
let now_us = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_micros() as u64;
let mut id_bytes = [0u8; 16];
id_bytes[..8].copy_from_slice(&seq.to_le_bytes());
id_bytes[8..].copy_from_slice(&now_us.to_le_bytes());
let header = EnvelopeHeader {
version: 7,
message_id: MessageId(id_bytes),
sender_did: self.did,
recipient_did: recipient,
sequence_number: seq,
timestamp_us: now_us,
priority: Priority::Normal,
payload_type: PayloadType::Raw,
};
let mut storage = PayloadStorage::<DEFAULT_PAYLOAD_CAPACITY>::new();
storage
.push_bytes(&encoded)
.map_err(|e| format!("{:?}", e))?;
let envelope = SignedEnvelope::sign(header, storage, &self.signing_key);
let result = self.transport.send(&envelope);
if result.is_ok() {
self.messages_sent.fetch_add(1, Ordering::Relaxed);
}
result
}
pub fn try_receive(&self) -> Option<ReceivedMessage> {
self.inbox.lock().ok()?.pop_front()
}
pub fn receive_timeout(&self, timeout: Duration) -> Option<ReceivedMessage> {
let deadline = Instant::now() + timeout;
let mut guard = self.inbox.lock().ok()?;
loop {
if let Some(msg) = guard.pop_front() {
return Some(msg);
}
if self.shutdown.load(Ordering::Relaxed) {
return None;
}
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
return None;
}
let (new_guard, timeout_result) =
self.inbox_notify.wait_timeout(guard, remaining).ok()?;
guard = new_guard;
if timeout_result.timed_out() {
return guard.pop_front();
}
}
}
pub fn inbox_len(&self) -> usize {
self.inbox.lock().map(|q| q.len()).unwrap_or(0)
}
pub fn known_peers(&self) -> Vec<(Did, std::net::SocketAddr)> {
self.routing.gossip_routes()
}
pub fn shutdown(&self) {
self.shutdown.store(true, Ordering::SeqCst);
self.inbox_notify.notify_all();
}
pub fn is_shutdown(&self) -> bool {
self.shutdown.load(Ordering::Relaxed)
}
pub fn health(&self) -> ClientHealth {
ClientHealth {
did: self.did,
is_shutdown: self.shutdown.load(Ordering::Relaxed),
inbox_len: self.inbox_len(),
inbox_capacity: self.receive_buffer_capacity,
messages_sent: self.messages_sent.load(Ordering::Relaxed),
messages_received: self.messages_received.load(Ordering::Relaxed),
messages_dropped: self.messages_dropped.load(Ordering::Relaxed),
known_peers: self.routing.gossip_routes().len(),
}
}
fn start_poller(&self) {
let transport = self.transport.clone();
let security = self.security.clone();
let inbox = self.inbox.clone();
let inbox_notify = self.inbox_notify.clone();
let self_did = self.did;
let capacity = self.receive_buffer_capacity;
let shutdown = self.shutdown.clone();
let messages_received = self.messages_received.clone();
let messages_dropped = self.messages_dropped.clone();
thread::Builder::new()
.name(format!("omnimesh-poller-{}", hex::encode(&self_did.0[..4])))
.spawn(move || {
while !shutdown.load(Ordering::Relaxed) {
match transport.receive() {
Some(envelope) => {
if envelope.header.recipient_did != self_did {
continue;
}
if security.verify(&envelope).is_err() {
continue;
}
let payload = match decode_payload(envelope.payload.as_slice()) {
Ok(p) => p,
Err(_) => continue,
};
let received_at_us = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_micros()
as u64;
let msg = ReceivedMessage {
sender: envelope.header.sender_did,
payload,
received_at_us,
};
if let Ok(mut q) = inbox.lock() {
if q.len() < capacity {
q.push_back(msg);
messages_received.fetch_add(1, Ordering::Relaxed);
inbox_notify.notify_one();
} else {
messages_dropped.fetch_add(1, Ordering::Relaxed);
}
}
}
None => {
thread::sleep(Duration::from_micros(500));
}
}
}
})
.expect("Failed to spawn poller thread");
}
}
impl std::fmt::Debug for OmnimeshClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OmnimeshClient")
.field("did", &hex::encode(&self.did.0))
.field("inbox_len", &self.inbox_len())
.field("is_shutdown", &self.is_shutdown())
.field("messages_sent", &self.messages_sent.load(Ordering::Relaxed))
.field(
"messages_received",
&self.messages_received.load(Ordering::Relaxed),
)
.finish()
}
}
#[derive(Debug, Clone)]
pub struct ClientHealth {
pub did: Did,
pub is_shutdown: bool,
pub inbox_len: usize,
pub inbox_capacity: usize,
pub messages_sent: u64,
pub messages_received: u64,
pub messages_dropped: u64,
pub known_peers: usize,
}
impl ClientHealth {
pub fn is_healthy(&self) -> bool {
!self.is_shutdown && self.inbox_len < self.inbox_capacity
}
pub fn inbox_utilization(&self) -> f64 {
if self.inbox_capacity == 0 {
return 1.0;
}
self.inbox_len as f64 / self.inbox_capacity as f64
}
}