plumber_rs/pipe.rs
1// Copyright (C) 2018, Hao Hou
2
3//! The Plumber Pipe IO API wrapper
4//!
5//! This module is the wrapper to the actual Plumber pipe API calls for Pipe IO
6
7use ::plumber_api::{runtime_api_pipe_t, runtime_api_pipe_flags_t};
8use ::plumber_api_call::get_cstr;
9
10use std::io::{Read, Write, Result, Error, ErrorKind};
11use std::os::raw::c_void;
12
13use std::io::BufReader;
14
15/**
16 * The integer type for the Plumber pipe flags
17 **/
18pub type PipeFlags = runtime_api_pipe_flags_t;
19
20/**
21 * The integer type used to represent a reference to the Pipe port
22 **/
23pub type PipeDescriptor = runtime_api_pipe_t;
24
25// TODO: Currently because of the limit of rust-bindgen, all the constant marcos with non-primitive
26// type is missing in the bind file. So we have do define it manually
27
28/**
29 * Indicates the pipe port is an input side
30 **/
31pub const PIPE_INPUT :PipeFlags = 0;
32/**
33 * Indictes the pipe port is an output side
34 **/
35pub const PIPE_OUTPUT :PipeFlags = 0x10000;
36/**
37 * If this flag is set it suggest the Plumber framework to keep the communication resource for more
38 * event even after current resource has been processed
39 **/
40pub const PIPE_PERSIST :PipeFlags = 0x20000;
41/**
42 * If this flag is set, it suggest Plumber framework use the async write thread if possible. This
43 * is typically useful when we want to write a large file
44 **/
45pub const PIPE_ASYNC :PipeFlags = 0x40000;
46/**
47 * This flag makes the output pipe a copy of input pipe. This is also called a fork, which split
48 * the dataflow into multiple ways.
49 **/
50pub const PIPE_SHADOW :PipeFlags = 0x80000;
51/**
52 * The pipe is diable, which is only meaningful when the pipe is a fork of another pipe. It
53 * indicates do not forward data to this fork
54 **/
55pub const PIPE_DISABLED :PipeFlags = 0x100000;
56
57const PIPE_CNTL_GET_FLAGS:u32 = ::plumber_api::RUNTIME_API_PIPE_CNTL_OPCODE_GET_FLAGS;
58const PIPE_CNTL_SET_FLAG:u32 = ::plumber_api::RUNTIME_API_PIPE_CNTL_OPCODE_SET_FLAG;
59const PIPE_CNTL_CLR_FLAG:u32 = ::plumber_api::RUNTIME_API_PIPE_CNTL_OPCODE_CLR_FLAG;
60const PIPE_CNTL_PUSH_STATE:u32 = ::plumber_api::RUNTIME_API_PIPE_CNTL_OPCODE_PUSH_STATE;
61const PIPE_CNTL_POP_STATE:u32 = ::plumber_api::RUNTIME_API_PIPE_CNTL_OPCODE_POP_STATE;
62
63
64struct PipeCntlData {
65 pipe : runtime_api_pipe_t,
66 opcode: u32,
67 result: i32
68}
69
70extern "C" fn invoke_pipe_cntl(ap:*mut ::va_list_helper::__va_list_tag, data_ptr:*mut c_void)
71{
72 if let Some(data) = unsafe { (data_ptr as *mut PipeCntlData).as_mut() }
73 {
74 plumber_api_call! {
75 let result = cntl(data.pipe, data.opcode, ap as *mut ::plumber_api::__va_list_tag) in
76 {
77 data.result = result;
78 }
79 }
80 }
81}
82
83macro_rules! pipe_cntl {
84 ($pipe:expr, $opcode:expr, $($args:expr),*) => {
85 if let Some(ref va_helper) = unsafe{::VA_LIST_HELPER}
86 {
87 let mut pipe_cntl_data = PipeCntlData {
88 pipe : $pipe,
89 opcode: $opcode,
90 result: -1
91 };
92 let mut data_ptr = &mut pipe_cntl_data as *mut PipeCntlData;
93 unsafe{ va_helper(Some(invoke_pipe_cntl), data_ptr as *mut c_void, $($args),*) }
94 pipe_cntl_data.result
95 }
96 else
97 {
98 -1
99 }
100 }
101}
102
103/**
104 * The Rust wrapper of a Plumber pipe port.
105 *
106 * In Plumber, we use a integer as identifer of the pipe
107 * port when we write the servlet. This is called `pipe_t` in the C API. However, In rust, we
108 * implemented the Pipe object which allows us directly read and write the pipe with the object.
109 *
110 * * `ST`: The type of the state. This is only used when we want to implement a stateful port
111 **/
112#[allow(dead_code)]
113pub struct Pipe<ST> {
114 /// The actual pipe descriptor
115 pipe : runtime_api_pipe_t,
116 /// The phantom data
117 _st : ::std::marker::PhantomData<ST>
118}
119
120
121/**
122 * A reference to a given pipe port.
123 *
124 * We need this type because the `std::io::bufreader` requires us to give out the ownership of the
125 * inner object to the bufreader. However a pipe port object should be used for each servlet
126 * activation, so we basically can not give it out. So we implement this reference type, so that we
127 * can give the ownership of this object to bufreader without destory the orignal pipe port object.
128 **/
129pub struct PipeRef {
130 /// The target pipe descriptor
131 pipe : runtime_api_pipe_t
132}
133
134impl Read for PipeRef {
135 fn read(&mut self, buf : &mut [u8]) -> Result<usize>
136 {
137 plumber_api_call!{
138 let result = read(self.pipe, buf.as_mut_ptr() as *mut c_void, buf.len()) in {
139 if result as isize != -1
140 {
141 return Ok(result as usize);
142 }
143 return Err(Error::new(ErrorKind::NotFound, "Plumber pipe_read API returns an error"));
144 }
145 }
146 return Err(Error::new(ErrorKind::Other, "Plumber guest code runtime doesn't fully initailized"));
147 }
148}
149
150impl <ST> Pipe<ST> {
151
152 /**
153 * Get a `std::io::BufReader` object from current pipe port.
154 *
155 * This is useful when we want to do text IO to the pipe
156 *
157 * Returns the ownership of the newly created reader
158 **/
159 pub fn as_bufreader(&self) -> BufReader<PipeRef>
160 {
161 return BufReader::new(PipeRef {
162 pipe : self.pipe
163 });
164 }
165
166 /**
167 * Get the actual pipe descriptor managed by this pipe object
168 *
169 * Return the pipe descriptor
170 **/
171 pub fn as_descriptor(&self) -> PipeDescriptor
172 {
173 return self.pipe.clone();
174 }
175
176 /**
177 * Define a new pipe port for the current servlet.
178 *
179 * This function creates the pipe port in Rust as well as Plumber framework. Since Plumber only
180 * allows pipe port declaration during the initialization stage, so if this function is called
181 * from execution or cleanup stage, the result will be a failure.
182 *
183 * * `name` The name of the port. It will be used for the dataflow graph construction
184 * * `flags` The initial pipe flag of this pipe.
185 * * `type_expr` The type expression for the protocol of this pipe port. See Plumber's protocol
186 * typing documentations for detail.
187 *
188 * Returns either `None` on creating failure or `Some` of ownership of the newly created pipe
189 * object
190 **/
191 pub fn define(name:&str, flags: PipeFlags, type_expr:Option<&str>) -> Option<Pipe<ST>>
192 {
193 let (name_ptr, _name) = get_cstr(Some(name));
194 let (type_ptr, _type) = get_cstr(type_expr);
195
196 plumber_api_call!{
197 let result = define(name_ptr, flags, type_ptr) in {
198 if result as i32 != -1
199 {
200 return Some(Pipe{pipe : result, _st : ::std::marker::PhantomData});
201 }
202 }
203 };
204
205 return None;
206 }
207
208 /**
209 * Check if the pipe contains no more data.
210 *
211 * This is meaningful only when we are currently executing some execution task with this servlet.
212 * Which means it only can be called from either `exec` and `async_init`, `async_cleanup` stage
213 * of a servlet. Otherwise it will returns a failure.
214 *
215 * The EOF function in Plumber defines a little bit different from normal EOF. It indicates if
216 * it's possible to have further data.
217 *
218 * If this function returns `true`, it's possible we have more data in the furture, but it's **not**
219 * means we current have data to read. It's also possible that there's no more data but the
220 * framework is not able to realize that currently.
221 *
222 * If this function returns `false`, it indicates there are definitely no more data can be read
223 * from this port.
224 *
225 * Returns either None on error case or the check result
226 **/
227 pub fn eof(&mut self) -> Option<bool>
228 {
229 plumber_api_call!{
230 let result = eof(self.pipe) in {
231 if result as i32 != -1
232 {
233 return Some(result > 0);
234 }
235 return None;
236 }
237 }
238
239 return None;
240 }
241
242 /**
243 * Get the runtime flags of this port.
244 *
245 * Since Plumber allows the pipe flag to be changed inside the execution stage. So this
246 * function is used to check what is the current pipe flags.
247 *
248 * Return either None on error or the current pipe flag
249 **/
250 pub fn flags(&mut self) -> Option<PipeFlags>
251 {
252 let mut pf = 0 as PipeFlags;
253 let pf_ref = &mut pf as *mut PipeFlags;
254
255 if -1 != pipe_cntl!(self.pipe, PIPE_CNTL_GET_FLAGS, pf_ref as *mut c_void)
256 {
257 return Some(pf);
258 }
259 return None;
260 }
261
262 /**
263 * Test if the pipe port has the required pipe flags been set.
264 *
265 * Returns either None on error or the current pipe flag
266 **/
267 pub fn check_flag(&mut self, flag:PipeFlags) -> Option<bool>
268 {
269 if let Some(result) = self.flags()
270 {
271 return Some((result & flag) == flag);
272 }
273 return None;
274 }
275
276 /**
277 * Set the runtime flags of the pipe port
278 *
279 * * `flag` The pipe flag we want to add to the pipe
280 *
281 * Return the operation result `None` indicates failure, `Some` Indicates success
282 **/
283 pub fn set_flags(&mut self, flag:PipeFlags) -> Option<()>
284 {
285 if -1 != pipe_cntl!(self.pipe, PIPE_CNTL_SET_FLAG, flag)
286 {
287 return Some(());
288 }
289 return None;
290 }
291
292 /**
293 * Unset the runtime flags for a pipe port
294 *
295 * * `flag` The pipe flag we want to unset
296 *
297 * Return the operation result `None` indicates failure, `Some` for success
298 **/
299 pub fn clear_flags(&mut self, flag:PipeFlags) -> Option<()>
300 {
301 if -1 != pipe_cntl!(self.pipe, PIPE_CNTL_CLR_FLAG, flag)
302 {
303 return Some(());
304 }
305 return None;
306 }
307
308 extern "C" fn dispose_state(ptr : *mut c_void) -> i32
309 {
310 unsafe { Box::from_raw(ptr as *mut ST) };
311 return 0;
312 }
313
314 /**
315 * Get the associated state for current pipe resource.
316 *
317 * Plumber allows stateful pipe port, which means in the execution state, the servlet can
318 * attach a state object with the pipe resource. After the state is attached, and
319 * `PIPE_PERSIST` flag is set, the framework will manage the state for the servlet.
320 *
321 * When the servlet is active again due to the same communication resource, the object can be
322 * retrieved.
323 *
324 * Returns the retrieved reference to the Obect.
325 *
326 * Note: Plumber framework always manage the ownership of the pushed state objects. So in this
327 * function only a reference will be returned. All the memory management is done by Plumber
328 * rather than Rust.
329 *
330 **/
331 pub fn get_state<'a>(&mut self) -> Option<&'a ST>
332 {
333 let state_ptr = ::std::ptr::null::<ST>() as *mut ST;
334
335 let state_ptr_ref = &state_ptr;
336
337 if -1 != pipe_cntl!(self.pipe, PIPE_CNTL_POP_STATE, state_ptr_ref as *const *mut ST)
338 {
339 if let Some(state) = unsafe{ state_ptr.as_ref() }
340 {
341 return Some(state);
342 }
343 }
344 return None;
345 }
346
347 /**
348 * Push the state object to the pipe. This will attach the state to the pipe communication
349 * resources.
350 *
351 * See the documentation of `get_state` for more detailed description of state mechanism.
352 *
353 * * `obj`: The box that contains the ownership of the state we want to push
354 *
355 * Return The operation result.
356 *
357 * Note: This function always takes the ownership of the state object, even if it returns a
358 * failure.
359 **/
360 pub fn push_state(&mut self, obj : Box<ST>) -> Option<()>
361 {
362 let dispose_func_ptr = Self::dispose_state as *const c_void;
363
364 let box_ref = Box::leak(obj);
365
366 let box_ptr = box_ref as *mut ST;
367
368 let void_ptr = box_ptr as *mut c_void;
369
370
371 if -1 != pipe_cntl!(self.pipe, PIPE_CNTL_PUSH_STATE, void_ptr, dispose_func_ptr)
372 {
373 return Some(());
374 }
375
376 Self::dispose_state(void_ptr);
377
378 return None;
379 }
380
381}
382
383impl <ST> Read for Pipe<ST> {
384 fn read(&mut self, buf: &mut [u8]) -> Result<usize>
385 {
386 plumber_api_call!{
387 let result = read(self.pipe, buf.as_mut_ptr() as *mut c_void, buf.len()) in {
388 if result as isize != -1
389 {
390 return Ok(result as usize);
391 }
392 return Err(Error::new(ErrorKind::NotFound, "Plumber pipe_read API returns an error"));
393 }
394 }
395 return Err(Error::new(ErrorKind::Other, "Plumber guest code runtime doesn't fully initailized"));
396 }
397}
398
399impl <ST> Write for Pipe<ST> {
400 fn write(&mut self, buf:&[u8]) -> Result<usize>
401 {
402 plumber_api_call!{
403 let result = write(self.pipe, buf.as_ptr() as *mut c_void, buf.len()) in {
404 if result as isize != -1
405 {
406 return Ok(result as usize);
407 }
408 return Err(Error::new(ErrorKind::NotFound, "Plumber pipe_write API returns an error"));
409 }
410 }
411 return Err(Error::new(ErrorKind::Other, "Plumber guest code runtime doesn't fully initailized"));
412 }
413
414 fn flush(&mut self) -> Result<()>
415 {
416 return Ok(());
417 }
418}