1use crate::backend::GenericSocketBackend;
2use crate::codec::Message;
3use crate::transport::AcceptStopHandle;
4use crate::{
5 CaptureSocket, Endpoint, MultiPeerBackend, Socket, SocketBackend, SocketEvent, SocketOptions,
6 SocketSend, SocketType, ZmqMessage, ZmqResult,
7};
8
9use async_trait::async_trait;
10use futures::channel::mpsc;
11
12use std::collections::hash_map::RandomState;
13use std::collections::HashMap;
14use std::sync::Arc;
15
16pub struct PushSocket {
17 backend: Arc<GenericSocketBackend>,
18 binds: HashMap<Endpoint, AcceptStopHandle>,
19}
20
21impl Drop for PushSocket {
22 fn drop(&mut self) {
23 self.backend.shutdown();
24 }
25}
26
27#[async_trait]
28impl Socket for PushSocket {
29 fn with_options(options: SocketOptions) -> Self {
30 Self {
31 backend: Arc::new(GenericSocketBackend::with_options(
32 None,
33 SocketType::PUSH,
34 options,
35 )),
36 binds: HashMap::new(),
37 }
38 }
39
40 fn backend(&self) -> Arc<dyn MultiPeerBackend> {
41 self.backend.clone()
42 }
43
44 fn binds(&mut self) -> &mut HashMap<Endpoint, AcceptStopHandle, RandomState> {
45 &mut self.binds
46 }
47
48 fn monitor(&mut self) -> mpsc::Receiver<SocketEvent> {
49 let (sender, receiver) = mpsc::channel(1024);
50 self.backend.socket_monitor.lock().replace(sender);
51 receiver
52 }
53}
54
55#[async_trait]
56impl SocketSend for PushSocket {
57 async fn send(&mut self, message: ZmqMessage) -> ZmqResult<()> {
58 self.backend
59 .send_round_robin(Message::Message(message))
60 .await?;
61 Ok(())
62 }
63}
64
65impl CaptureSocket for PushSocket {}