tokio_zmq/socket/
mod.rs

1/*
2 * This file is part of Tokio ZMQ.
3 *
4 * Copyright © 2018 Riley Trautman
5 *
6 * Tokio ZMQ is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * Tokio ZMQ is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with Tokio ZMQ.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20//! This module contains useful types for working with ZeroMQ Sockets.
21
22pub mod config;
23pub mod types;
24
25
26use async_zmq_types::{InnerSocket, IntoInnerSocket, Multipart, SocketBuilder};
27use futures::{Async, task::Task};
28use mio::Ready;
29use std::{fmt, sync::Arc};
30use tokio_reactor::PollEvented;
31
32use zmq;
33
34use crate::{
35    async_types::{
36        EventedFile, MultipartRequest, MultipartResponse, MultipartSink, MultipartSinkStream,
37        MultipartStream,
38    },
39    error::Error,
40    file::ZmqFile,
41};
42
43/// Defines the raw Socket type. This type should never be interacted with directly, except to
44/// create new instances of wrapper types.
45pub struct Socket {
46    // Reads and Writes data
47    sock: zmq::Socket,
48    // So we can hand out files to streams and sinks
49    file: EventedFile,
50}
51
52impl Socket {
53    /// Start a new Socket Config builder
54    pub fn builder<T>(ctx: Arc<zmq::Context>) -> SocketBuilder<'static, T>
55    where
56        T: IntoInnerSocket,
57    {
58        SocketBuilder::new(ctx)
59    }
60
61    /// Retrieve a Reference-Counted Pointer to self's socket.
62    pub fn inner(self) -> (zmq::Socket, EventedFile) {
63        (self.sock, self.file)
64    }
65
66    /// Create a new socket from a given Sock and File
67    ///
68    /// This assumes that `sock` is already configured properly. Please don't call this directly
69    /// unless you know what you're doing.
70    pub fn from_sock_and_file(sock: zmq::Socket, file: EventedFile) -> Self {
71        Socket { sock, file }
72    }
73
74    /// Create a new socket from a given Sock
75    ///
76    /// This assumes that `sock` is already configured properly. Please don't call this directly
77    /// unless you know what you're doing.
78    pub fn from_sock(sock: zmq::Socket) -> Result<Self, Error> {
79        let fd = sock.get_fd()?;
80        let file = PollEvented::new(ZmqFile::from_raw_fd(fd));
81
82        Ok(Socket { sock, file })
83    }
84
85    pub(crate) fn send_msg(&self, msg: zmq::Message, flags: i32) -> zmq::Result<()> {
86        self.sock.send(msg, flags)
87    }
88
89    pub(crate) fn recv_msg(&self, msg: &mut zmq::Message) -> zmq::Result<()> {
90        self.sock.recv(msg, zmq::DONTWAIT)
91    }
92
93    pub(crate) fn poll_read_ready(&self, mask: Ready, task: Option<&Task>) -> Result<Async<Ready>, Error> {
94        let _ = self.file.poll_read_ready(mask)?;
95
96        let events = self.sock.get_events()?;
97
98        if let Some(task) = task {
99            if events & zmq::POLLOUT == zmq::POLLOUT {
100                task.notify();
101            }
102        }
103
104        if events & zmq::POLLIN == zmq::POLLIN {
105            return Ok(Async::Ready(mask));
106        }
107
108        self.file.clear_read_ready(mask)?;
109        Ok(Async::NotReady)
110    }
111
112    pub(crate) fn poll_write_ready(&self, task: Option<&Task>) -> Result<Async<()>, Error> {
113        let _ = self.file.poll_write_ready()?;
114
115        let events = self.sock.get_events()?;
116
117        if let Some(task) = task {
118            if events & zmq::POLLIN == zmq::POLLIN {
119                task.notify();
120            }
121        }
122
123        if events & zmq::POLLOUT == zmq::POLLOUT {
124            return Ok(Async::Ready(()));
125        }
126
127        self.file.clear_write_ready()?;
128        Ok(Async::NotReady)
129    }
130
131    pub(crate) fn clear_read_ready(&self, mask: Ready) -> Result<(), std::io::Error> {
132        self.file.clear_read_ready(mask)
133    }
134
135    pub(crate) fn clear_write_ready(&self) -> Result<(), std::io::Error> {
136        self.file.clear_write_ready()
137    }
138}
139
140impl<T> InnerSocket<T> for Socket
141where
142    T: IntoInnerSocket + From<Self>,
143{
144    type Request = MultipartRequest<T>;
145    type Response = MultipartResponse<T>;
146
147    type Sink = MultipartSink<T>;
148    type Stream = MultipartStream<T>;
149
150    type SinkStream = MultipartSinkStream<T>;
151
152    fn send(self, multipart: Multipart) -> Self::Request {
153        MultipartRequest::new(self, multipart)
154    }
155
156    fn recv(self) -> Self::Response {
157        MultipartResponse::new(self)
158    }
159
160    fn stream(self) -> Self::Stream {
161        MultipartStream::new(self)
162    }
163
164    fn sink(self, buffer_size: usize) -> Self::Sink {
165        MultipartSink::new(buffer_size, self)
166    }
167
168    fn sink_stream(self, buffer_size: usize) -> Self::SinkStream {
169        MultipartSinkStream::new(buffer_size, self)
170    }
171}
172
173impl From<(zmq::Socket, EventedFile)> for Socket {
174    fn from((sock, file): (zmq::Socket, EventedFile)) -> Self {
175        Socket { sock, file }
176    }
177}
178
179impl fmt::Debug for Socket {
180    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
181        write!(f, "Socket")
182    }
183}
184
185impl fmt::Display for Socket {
186    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
187        write!(f, "Socket")
188    }
189}