1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
mod builder; mod envelop; mod handler; pub mod msgs; mod receiver; pub mod receivers; mod trait_object; mod untyped; mod utils; use builder::BusBuilder; pub use envelop::Message; pub use handler::*; pub use receiver::SendError; use receiver::{Receiver, ReceiverStats}; use utils::binary_search_range_by_key; use core::any::TypeId; use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, }; pub use untyped::Untyped; pub type Result = anyhow::Result<()>; pub struct BusInner { receivers: Vec<(TypeId, Receiver)>, closed: AtomicBool, } impl BusInner { pub(crate) fn new(mut receivers: Vec<(TypeId, Receiver)>) -> Self { receivers.sort_unstable_by_key(|(k, _)| *k); Self { receivers, closed: AtomicBool::new(false), } } pub fn close(&self) { self.closed.store(true, Ordering::SeqCst); for (_, r) in &self.receivers { r.close(); } } pub async fn sync(&self) { for (_, r) in &self.receivers { r.sync().await; } } pub fn stats(&self) -> impl Iterator<Item = ReceiverStats> + '_ { self.receivers.iter().map(|(_, r)| r.stats()) } pub fn try_send<M: Message>(&self, msg: M) -> core::result::Result<(), SendError<M>> { if self.closed.load(Ordering::SeqCst) { return Err(SendError::Closed(msg)); } let tid = TypeId::of::<M>(); let range = binary_search_range_by_key(&self.receivers, &tid, |(k, _)| *k); for i in (range.start + 1)..range.end { self.receivers[i].1.try_broadcast(msg.clone())?; } if let Some((_, r)) = self.receivers.get(range.start) { r.try_broadcast(msg)?; } else { println!("Unhandled message {:?}", core::any::type_name::<M>()); } Ok(()) } #[inline] pub fn send_blocking<M: Message>(&self, msg: M) -> core::result::Result<(), SendError<M>> { futures::executor::block_on(self.send(msg)) } pub async fn send<M: Message>(&self, msg: M) -> core::result::Result<(), SendError<M>> { if self.closed.load(Ordering::SeqCst) { return Err(SendError::Closed(msg)); } let tid = TypeId::of::<M>(); let range = binary_search_range_by_key(&self.receivers, &tid, |(k, _)| *k); for i in (range.start + 1)..range.end { self.receivers[i].1.broadcast(msg.clone()).await?; } if let Some((_, r)) = self.receivers.get(range.start) { r.broadcast(msg).await?; } else { println!("Unhandled message {:?}", core::any::type_name::<M>()); } Ok(()) } } #[derive(Clone)] pub struct Bus { inner: Arc<BusInner>, } impl core::ops::Deref for Bus { type Target = BusInner; fn deref(&self) -> &Self::Target { self.inner.as_ref() } } impl Bus { #[inline] pub fn build() -> BusBuilder { BusBuilder::new() } }