1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
#![doc(html_playground_url = "https://play.rust-lang.org/")] //! Work in Progress - not usable yet! //! //! # msgbus //! `msgbus` provides the ability to publish messages on a channel (or bus) and subscribe to channels to recieve all the messages //! published on that channel. To support scalability, when you first register to be able to publish on that channel, you indicate what //! kind of bandwidth you will require. The overall message bus is managed by a Message Manager(msgmgr) and that msgmgr will be configured //! with various transport capabilities. //! //! # Theory of Operation //! A message bus is a general communication mechanism which has a many-to-many relationship between publishers (many clients can publish on a given bus) //! and subscribers (many clients can subscribe to a given bus). In an implementation where we can have many buses, we can have dedicated buses for one-to-one, //! one-to-many and many-to-one relationships as needed. //! //! In this particular implementation, we can have many buses, called `Channels`, and a further enhancement has been added to support scalability. In the //! simplest implementation, the publishers and subscribers are threads in a shared application. Communication between them is considered to be high //! bandwidth as it can be implemented as shared/copied messages. In a slightly scaled up implementation, the publishers and subscribers may exist in //! separate applicaitons on the some processor. This medium bandwith implementation can be implemented as shared memory between those applications or other //! local mechanisms. A lower bandwith implementation may have those puclishers and subscribers existing on different connected processors. //! //! The enhancement to support this is called the `Transport` and is presented as a trait of which we provide several different examples. The clients //! (pubblishers or subscribers) don't choose the transport, but rather the bandwidth they require. If during later development, the application must be split //! across mulitple processes, or multiple processors, the clients require almost no refactoring as they are independent from the transport. //! //! # Publishing //! Publishing is a non-blocking call to the `msgbus` designated by the `rmb::Channel`. This is a simple `u32` which you define the means for your specific //! application. What you send is a structure with the trait of `rmb::Msg`. The msg will be put on the channel, whether there is any subscribers or not. //! //! # Subscribing //! When you subscribe to the a particular channel, your handler will be called for all msgs received from that point forward. The handler may be a function //! or closure which you passed to the subscribe call. The handler will be called in the thread context that the msgbus was created in. //! //! # Simple Example //! //! ``` //! use msgbus::{msgmgr,rmb,transport::internal}; //! use std::fmt; //! //! fn main() { //! struct MyMsg { //! s: String, //! } //! impl rmb::Msg for MyMsg { //! //! } //! impl fmt::Display for MyMsg { //! fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { //! write!(f, "{}", self.s) //! } //! } //! fn handler(_chan: rmb::Channel, msg: &dyn rmb::Msg)-> Result<String, String> { //! println!("{}", msg); //! Ok(msg.to_string()) //! } //! //! let t = internal::TransportInternal::new(); //! let mut mm = msgmgr::MsgMgr::new(vec![(0..10,&t)]); //! let mut mb = rmb::Rmb::new(&mut mm); //! mb.init().unwrap(); //! let hello = MyMsg { s: "Hello".to_string() }; //! let chan = 1; //! mb.subscribe(chan, handler).unwrap(); //! mb.publish(chan, &hello).unwrap(); //! //! } pub mod rmb; pub mod msgmgr; pub mod transport; #[cfg(test)] mod tests { use super::{rmb, msgmgr, transport::local, transport::internal}; #[test] fn test_init() { let t = local::TransportLocal::new(); let mut mb = msgmgr::MsgMgr::new(vec![(0..10,&t)]); let mut r = rmb::Rmb::new(&mut mb); r.init().unwrap(); } #[test] #[ignore] fn test_simple_subscribe_publish() { impl rmb::Msg for String { } fn handler(_chan: rmb::Channel, msg: &dyn rmb::Msg)-> Result<String, String> { println!("{}", msg); assert_eq!(msg.to_string(), "Hello".to_string()); Ok(msg.to_string()) } let t = internal::TransportInternal::new(); let mut mb = msgmgr::MsgMgr::new(vec![(0..10,&t)]); let mut r = rmb::Rmb::new(&mut mb); r.init().unwrap(); let hello = "Hello".to_string(); let chan = 1; r.subscribe(chan, handler).unwrap(); r.publish(chan, &hello).unwrap(); } }