tokio_zmq/async_types/
sink.rs

1/*
2 * This file is part of Tokio ZMQ.
3 *
4 * Copyright © 2018 Riley Trautman
5 *
6 * Tokio ZMQ is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * Tokio ZMQ is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with Tokio ZMQ.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20//! This module defines the `MultipartSink` type. A wrapper around Sockets that implements
21//! `futures::Sink`.
22
23use std::{fmt, marker::PhantomData};
24
25use async_zmq_types::{IntoSocket, Multipart};
26use futures::{Async, AsyncSink, Sink};
27
28use crate::{
29    async_types::sink_type::SinkType,
30    error::Error,
31    socket::Socket,
32};
33
34/// The `MultipartSink` Sink handles sending streams of data to ZeroMQ Sockets.
35///
36/// You shouldn't ever need to manually create one. Here's how to get one from a 'raw' `Socket`'
37/// type.
38///
39/// ### Example
40/// ```rust
41/// extern crate zmq;
42/// extern crate futures;
43/// extern crate tokio;
44/// extern crate tokio_zmq;
45///
46/// use std::sync::Arc;
47///
48/// use futures::{Future, Sink};
49/// use tokio_zmq::{prelude::*, Error, Multipart, Pub, Socket};
50///
51/// fn main() {
52///     let context = Arc::new(zmq::Context::new());
53///     let fut = Pub::builder(context)
54///         .bind("tcp://*:5568")
55///         .build()
56///         .and_then(|zpub| {
57///             let sink = zpub.sink(25);
58///
59///             let msg = zmq::Message::from("Some message");
60///
61///             sink.send(msg.into())
62///         });
63///
64///     // tokio::run(fut.map(|_| ()).map_err(|_| ()));
65/// }
66/// ```
67pub struct MultipartSink<T>
68where
69    T: From<Socket>,
70{
71    sock: Socket,
72    inner: SinkType,
73    phantom: PhantomData<T>,
74}
75
76impl<T> MultipartSink<T>
77where
78    T: From<Socket>,
79{
80    pub fn new(buffer_size: usize, sock: Socket) -> Self {
81        MultipartSink {
82            sock,
83            inner: SinkType::new(buffer_size),
84            phantom: PhantomData,
85        }
86    }
87}
88
89impl<T> IntoSocket<T, Socket> for MultipartSink<T>
90where
91    T: From<Socket>,
92{
93    fn into_socket(self) -> T {
94        T::from(self.sock)
95    }
96}
97
98impl<T> Sink for MultipartSink<T>
99where
100    T: From<Socket>,
101{
102    type SinkItem = Multipart;
103    type SinkError = Error;
104
105    fn start_send(
106        &mut self,
107        multipart: Self::SinkItem,
108    ) -> Result<AsyncSink<Self::SinkItem>, Self::SinkError> {
109        self.inner.start_send(multipart, &self.sock, None)
110    }
111
112    fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError> {
113        self.inner.poll_complete(&self.sock, None)
114    }
115}
116
117impl<T> fmt::Debug for MultipartSink<T>
118where
119    T: From<Socket>,
120{
121    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
122        write!(f, "MultipartSink")
123    }
124}
125
126impl<T> fmt::Display for MultipartSink<T>
127where
128    T: From<Socket>,
129{
130    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
131        write!(f, "MultipartSink")
132    }
133}