1pub mod config;
23pub mod types;
24
25
26use async_zmq_types::{InnerSocket, IntoInnerSocket, Multipart, SocketBuilder};
27use futures::{Async, task::Task};
28use mio::Ready;
29use std::{fmt, sync::Arc};
30use tokio_reactor::PollEvented;
31
32use zmq;
33
34use crate::{
35 async_types::{
36 EventedFile, MultipartRequest, MultipartResponse, MultipartSink, MultipartSinkStream,
37 MultipartStream,
38 },
39 error::Error,
40 file::ZmqFile,
41};
42
43pub struct Socket {
46 sock: zmq::Socket,
48 file: EventedFile,
50}
51
52impl Socket {
53 pub fn builder<T>(ctx: Arc<zmq::Context>) -> SocketBuilder<'static, T>
55 where
56 T: IntoInnerSocket,
57 {
58 SocketBuilder::new(ctx)
59 }
60
61 pub fn inner(self) -> (zmq::Socket, EventedFile) {
63 (self.sock, self.file)
64 }
65
66 pub fn from_sock_and_file(sock: zmq::Socket, file: EventedFile) -> Self {
71 Socket { sock, file }
72 }
73
74 pub fn from_sock(sock: zmq::Socket) -> Result<Self, Error> {
79 let fd = sock.get_fd()?;
80 let file = PollEvented::new(ZmqFile::from_raw_fd(fd));
81
82 Ok(Socket { sock, file })
83 }
84
85 pub(crate) fn send_msg(&self, msg: zmq::Message, flags: i32) -> zmq::Result<()> {
86 self.sock.send(msg, flags)
87 }
88
89 pub(crate) fn recv_msg(&self, msg: &mut zmq::Message) -> zmq::Result<()> {
90 self.sock.recv(msg, zmq::DONTWAIT)
91 }
92
93 pub(crate) fn poll_read_ready(&self, mask: Ready, task: Option<&Task>) -> Result<Async<Ready>, Error> {
94 let _ = self.file.poll_read_ready(mask)?;
95
96 let events = self.sock.get_events()?;
97
98 if let Some(task) = task {
99 if events & zmq::POLLOUT == zmq::POLLOUT {
100 task.notify();
101 }
102 }
103
104 if events & zmq::POLLIN == zmq::POLLIN {
105 return Ok(Async::Ready(mask));
106 }
107
108 self.file.clear_read_ready(mask)?;
109 Ok(Async::NotReady)
110 }
111
112 pub(crate) fn poll_write_ready(&self, task: Option<&Task>) -> Result<Async<()>, Error> {
113 let _ = self.file.poll_write_ready()?;
114
115 let events = self.sock.get_events()?;
116
117 if let Some(task) = task {
118 if events & zmq::POLLIN == zmq::POLLIN {
119 task.notify();
120 }
121 }
122
123 if events & zmq::POLLOUT == zmq::POLLOUT {
124 return Ok(Async::Ready(()));
125 }
126
127 self.file.clear_write_ready()?;
128 Ok(Async::NotReady)
129 }
130
131 pub(crate) fn clear_read_ready(&self, mask: Ready) -> Result<(), std::io::Error> {
132 self.file.clear_read_ready(mask)
133 }
134
135 pub(crate) fn clear_write_ready(&self) -> Result<(), std::io::Error> {
136 self.file.clear_write_ready()
137 }
138}
139
140impl<T> InnerSocket<T> for Socket
141where
142 T: IntoInnerSocket + From<Self>,
143{
144 type Request = MultipartRequest<T>;
145 type Response = MultipartResponse<T>;
146
147 type Sink = MultipartSink<T>;
148 type Stream = MultipartStream<T>;
149
150 type SinkStream = MultipartSinkStream<T>;
151
152 fn send(self, multipart: Multipart) -> Self::Request {
153 MultipartRequest::new(self, multipart)
154 }
155
156 fn recv(self) -> Self::Response {
157 MultipartResponse::new(self)
158 }
159
160 fn stream(self) -> Self::Stream {
161 MultipartStream::new(self)
162 }
163
164 fn sink(self, buffer_size: usize) -> Self::Sink {
165 MultipartSink::new(buffer_size, self)
166 }
167
168 fn sink_stream(self, buffer_size: usize) -> Self::SinkStream {
169 MultipartSinkStream::new(buffer_size, self)
170 }
171}
172
173impl From<(zmq::Socket, EventedFile)> for Socket {
174 fn from((sock, file): (zmq::Socket, EventedFile)) -> Self {
175 Socket { sock, file }
176 }
177}
178
179impl fmt::Debug for Socket {
180 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
181 write!(f, "Socket")
182 }
183}
184
185impl fmt::Display for Socket {
186 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
187 write!(f, "Socket")
188 }
189}