1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
use crate::backend::GenericSocketBackend; use crate::codec::Message; use crate::transport::AcceptStopHandle; use crate::{ BlockingSend, Endpoint, MultiPeerBackend, Socket, SocketBackend, SocketEvent, SocketType, ZmqMessage, ZmqResult, }; use async_trait::async_trait; use futures::channel::mpsc; use std::collections::hash_map::RandomState; use std::collections::HashMap; use std::sync::Arc; pub struct PushSocket { backend: Arc<GenericSocketBackend>, binds: HashMap<Endpoint, AcceptStopHandle>, } impl Drop for PushSocket { fn drop(&mut self) { self.backend.shutdown(); } } #[async_trait] impl Socket for PushSocket { fn new() -> Self { Self { backend: Arc::new(GenericSocketBackend::new(None, SocketType::PUSH)), binds: HashMap::new(), } } fn backend(&self) -> Arc<dyn MultiPeerBackend> { self.backend.clone() } fn binds(&mut self) -> &mut HashMap<Endpoint, AcceptStopHandle, RandomState> { &mut self.binds } fn monitor(&mut self) -> mpsc::Receiver<SocketEvent> { let (sender, receiver) = mpsc::channel(1024); self.backend.socket_monitor.lock().replace(sender); receiver } } #[async_trait] impl BlockingSend for PushSocket { async fn send(&mut self, message: ZmqMessage) -> ZmqResult<()> { self.backend .send_round_robin(Message::Message(message)) .await?; Ok(()) } }