tokio_zmq/async_types/sink.rs
1/*
2 * This file is part of Tokio ZMQ.
3 *
4 * Copyright © 2018 Riley Trautman
5 *
6 * Tokio ZMQ is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * Tokio ZMQ is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with Tokio ZMQ. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20//! This module defines the `MultipartSink` type. A wrapper around Sockets that implements
21//! `futures::Sink`.
22
23use std::{fmt, marker::PhantomData};
24
25use async_zmq_types::{IntoSocket, Multipart};
26use futures::{Async, AsyncSink, Sink};
27
28use crate::{
29 async_types::sink_type::SinkType,
30 error::Error,
31 socket::Socket,
32};
33
34/// The `MultipartSink` Sink handles sending streams of data to ZeroMQ Sockets.
35///
36/// You shouldn't ever need to manually create one. Here's how to get one from a 'raw' `Socket`'
37/// type.
38///
39/// ### Example
40/// ```rust
41/// extern crate zmq;
42/// extern crate futures;
43/// extern crate tokio;
44/// extern crate tokio_zmq;
45///
46/// use std::sync::Arc;
47///
48/// use futures::{Future, Sink};
49/// use tokio_zmq::{prelude::*, Error, Multipart, Pub, Socket};
50///
51/// fn main() {
52/// let context = Arc::new(zmq::Context::new());
53/// let fut = Pub::builder(context)
54/// .bind("tcp://*:5568")
55/// .build()
56/// .and_then(|zpub| {
57/// let sink = zpub.sink(25);
58///
59/// let msg = zmq::Message::from("Some message");
60///
61/// sink.send(msg.into())
62/// });
63///
64/// // tokio::run(fut.map(|_| ()).map_err(|_| ()));
65/// }
66/// ```
67pub struct MultipartSink<T>
68where
69 T: From<Socket>,
70{
71 sock: Socket,
72 inner: SinkType,
73 phantom: PhantomData<T>,
74}
75
76impl<T> MultipartSink<T>
77where
78 T: From<Socket>,
79{
80 pub fn new(buffer_size: usize, sock: Socket) -> Self {
81 MultipartSink {
82 sock,
83 inner: SinkType::new(buffer_size),
84 phantom: PhantomData,
85 }
86 }
87}
88
89impl<T> IntoSocket<T, Socket> for MultipartSink<T>
90where
91 T: From<Socket>,
92{
93 fn into_socket(self) -> T {
94 T::from(self.sock)
95 }
96}
97
98impl<T> Sink for MultipartSink<T>
99where
100 T: From<Socket>,
101{
102 type SinkItem = Multipart;
103 type SinkError = Error;
104
105 fn start_send(
106 &mut self,
107 multipart: Self::SinkItem,
108 ) -> Result<AsyncSink<Self::SinkItem>, Self::SinkError> {
109 self.inner.start_send(multipart, &self.sock, None)
110 }
111
112 fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError> {
113 self.inner.poll_complete(&self.sock, None)
114 }
115}
116
117impl<T> fmt::Debug for MultipartSink<T>
118where
119 T: From<Socket>,
120{
121 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
122 write!(f, "MultipartSink")
123 }
124}
125
126impl<T> fmt::Display for MultipartSink<T>
127where
128 T: From<Socket>,
129{
130 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
131 write!(f, "MultipartSink")
132 }
133}