Skip to main content

zeromq/
push.rs

1use crate::backend::GenericSocketBackend;
2use crate::codec::Message;
3use crate::transport::AcceptStopHandle;
4use crate::{
5    CaptureSocket, Endpoint, MultiPeerBackend, Socket, SocketBackend, SocketEvent, SocketOptions,
6    SocketSend, SocketType, ZmqMessage, ZmqResult,
7};
8
9use async_trait::async_trait;
10use futures::channel::mpsc;
11
12use std::collections::hash_map::RandomState;
13use std::collections::HashMap;
14use std::sync::Arc;
15
16pub struct PushSocket {
17    backend: Arc<GenericSocketBackend>,
18    binds: HashMap<Endpoint, AcceptStopHandle>,
19}
20
21impl Drop for PushSocket {
22    fn drop(&mut self) {
23        self.backend.shutdown();
24    }
25}
26
27#[async_trait]
28impl Socket for PushSocket {
29    fn with_options(options: SocketOptions) -> Self {
30        Self {
31            backend: Arc::new(GenericSocketBackend::with_options(
32                None,
33                SocketType::PUSH,
34                options,
35            )),
36            binds: HashMap::new(),
37        }
38    }
39
40    fn backend(&self) -> Arc<dyn MultiPeerBackend> {
41        self.backend.clone()
42    }
43
44    fn binds(&mut self) -> &mut HashMap<Endpoint, AcceptStopHandle, RandomState> {
45        &mut self.binds
46    }
47
48    fn monitor(&mut self) -> mpsc::Receiver<SocketEvent> {
49        let (sender, receiver) = mpsc::channel(1024);
50        self.backend.socket_monitor.lock().replace(sender);
51        receiver
52    }
53}
54
55#[async_trait]
56impl SocketSend for PushSocket {
57    async fn send(&mut self, message: ZmqMessage) -> ZmqResult<()> {
58        self.backend
59            .send_round_robin(Message::Message(message))
60            .await?;
61        Ok(())
62    }
63}
64
65impl CaptureSocket for PushSocket {}