rthrift/protocol/
multiplexed.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use super::{TFieldIdentifier, TListIdentifier, TMapIdentifier, TMessageIdentifier, TMessageType,
19            TOutputProtocol, TSetIdentifier, TStructIdentifier};
20
21/// `TOutputProtocol` that prefixes the service name to all outgoing Thrift
22/// messages.
23///
24/// A `TMultiplexedOutputProtocol` should be used when multiple Thrift services
25/// send messages over a single I/O channel. By prefixing service identifiers
26/// to outgoing messages receivers are able to demux them and route them to the
27/// appropriate service processor. Rust receivers must use a `TMultiplexedProcessor`
28/// to process incoming messages, while other languages must use their
29/// corresponding multiplexed processor implementations.
30///
31/// For example, given a service `TestService` and a service call `test_call`,
32/// this implementation would identify messages as originating from
33/// `TestService:test_call`.
34///
35/// # Examples
36///
37/// Create and use a `TMultiplexedOutputProtocol`.
38///
39/// ```no_run
40/// use thrift::protocol::{TMessageIdentifier, TMessageType, TOutputProtocol};
41/// use thrift::protocol::{TBinaryOutputProtocol, TMultiplexedOutputProtocol};
42/// use thrift::transport::TTcpChannel;
43///
44/// let mut channel = TTcpChannel::new();
45/// channel.open("localhost:9090").unwrap();
46///
47/// let protocol = TBinaryOutputProtocol::new(channel, true);
48/// let mut protocol = TMultiplexedOutputProtocol::new("service_name", protocol);
49///
50/// let ident = TMessageIdentifier::new("svc_call", TMessageType::Call, 1);
51/// protocol.write_message_begin(&ident).unwrap();
52/// ```
53#[derive(Debug)]
54pub struct TMultiplexedOutputProtocol<P>
55where
56    P: TOutputProtocol,
57{
58    service_name: String,
59    inner: P,
60}
61
62impl<P> TMultiplexedOutputProtocol<P>
63where
64    P: TOutputProtocol,
65{
66    /// Create a `TMultiplexedOutputProtocol` that identifies outgoing messages
67    /// as originating from a service named `service_name` and sends them over
68    /// the `wrapped` `TOutputProtocol`. Outgoing messages are encoded and sent
69    /// by `wrapped`, not by this instance.
70    pub fn new(service_name: &str, wrapped: P) -> TMultiplexedOutputProtocol<P> {
71        TMultiplexedOutputProtocol {
72            service_name: service_name.to_owned(),
73            inner: wrapped,
74        }
75    }
76}
77
78// FIXME: avoid passthrough methods
79impl<P> TOutputProtocol for TMultiplexedOutputProtocol<P>
80where
81    P: TOutputProtocol,
82{
83    fn write_message_begin(&mut self, identifier: &TMessageIdentifier) -> ::Result<()> {
84        match identifier.message_type { // FIXME: is there a better way to override identifier here?
85            TMessageType::Call | TMessageType::OneWay => {
86                let identifier = TMessageIdentifier {
87                    name: format!("{}:{}", self.service_name, identifier.name),
88                    ..*identifier
89                };
90                self.inner.write_message_begin(&identifier)
91            }
92            _ => self.inner.write_message_begin(identifier),
93        }
94    }
95
96    fn write_message_end(&mut self) -> ::Result<()> {
97        self.inner.write_message_end()
98    }
99
100    fn write_struct_begin(&mut self, identifier: &TStructIdentifier) -> ::Result<()> {
101        self.inner.write_struct_begin(identifier)
102    }
103
104    fn write_struct_end(&mut self) -> ::Result<()> {
105        self.inner.write_struct_end()
106    }
107
108    fn write_field_begin(&mut self, identifier: &TFieldIdentifier) -> ::Result<()> {
109        self.inner.write_field_begin(identifier)
110    }
111
112    fn write_field_end(&mut self) -> ::Result<()> {
113        self.inner.write_field_end()
114    }
115
116    fn write_field_stop(&mut self) -> ::Result<()> {
117        self.inner.write_field_stop()
118    }
119
120    fn write_bytes(&mut self, b: &[u8]) -> ::Result<()> {
121        self.inner.write_bytes(b)
122    }
123
124    fn write_bool(&mut self, b: bool) -> ::Result<()> {
125        self.inner.write_bool(b)
126    }
127
128    fn write_i8(&mut self, i: i8) -> ::Result<()> {
129        self.inner.write_i8(i)
130    }
131
132    fn write_i16(&mut self, i: i16) -> ::Result<()> {
133        self.inner.write_i16(i)
134    }
135
136    fn write_i32(&mut self, i: i32) -> ::Result<()> {
137        self.inner.write_i32(i)
138    }
139
140    fn write_i64(&mut self, i: i64) -> ::Result<()> {
141        self.inner.write_i64(i)
142    }
143
144    fn write_double(&mut self, d: f64) -> ::Result<()> {
145        self.inner.write_double(d)
146    }
147
148    fn write_string(&mut self, s: &str) -> ::Result<()> {
149        self.inner.write_string(s)
150    }
151
152    fn write_list_begin(&mut self, identifier: &TListIdentifier) -> ::Result<()> {
153        self.inner.write_list_begin(identifier)
154    }
155
156    fn write_list_end(&mut self) -> ::Result<()> {
157        self.inner.write_list_end()
158    }
159
160    fn write_set_begin(&mut self, identifier: &TSetIdentifier) -> ::Result<()> {
161        self.inner.write_set_begin(identifier)
162    }
163
164    fn write_set_end(&mut self) -> ::Result<()> {
165        self.inner.write_set_end()
166    }
167
168    fn write_map_begin(&mut self, identifier: &TMapIdentifier) -> ::Result<()> {
169        self.inner.write_map_begin(identifier)
170    }
171
172    fn write_map_end(&mut self) -> ::Result<()> {
173        self.inner.write_map_end()
174    }
175
176    fn flush(&mut self) -> ::Result<()> {
177        self.inner.flush()
178    }
179
180    // utility
181    //
182
183    fn write_byte(&mut self, b: u8) -> ::Result<()> {
184        self.inner.write_byte(b)
185    }
186}
187
188#[cfg(test)]
189mod tests {
190
191    use protocol::{TBinaryOutputProtocol, TMessageIdentifier, TMessageType, TOutputProtocol};
192    use transport::{TBufferChannel, TIoChannel, WriteHalf};
193
194    use super::*;
195
196    #[test]
197    fn must_write_message_begin_with_prefixed_service_name() {
198        let mut o_prot = test_objects();
199
200        let ident = TMessageIdentifier::new("bar", TMessageType::Call, 2);
201        assert_success!(o_prot.write_message_begin(&ident));
202
203        let expected: [u8; 19] = [
204            0x80,
205            0x01, /* protocol identifier */
206            0x00,
207            0x01, /* message type */
208            0x00,
209            0x00,
210            0x00,
211            0x07,
212            0x66,
213            0x6F,
214            0x6F, /* "foo" */
215            0x3A, /* ":" */
216            0x62,
217            0x61,
218            0x72, /* "bar" */
219            0x00,
220            0x00,
221            0x00,
222            0x02 /* sequence number */,
223        ];
224
225        assert_eq!(o_prot.inner.transport.write_bytes(), expected);
226    }
227
228    fn test_objects
229        ()
230        -> TMultiplexedOutputProtocol<TBinaryOutputProtocol<WriteHalf<TBufferChannel>>>
231    {
232        let c = TBufferChannel::with_capacity(40, 40);
233        let (_, w_chan) = c.split().unwrap();
234        let prot = TBinaryOutputProtocol::new(w_chan, true);
235        TMultiplexedOutputProtocol::new("foo", prot)
236    }
237}