plumber_rs/
pipe.rs

1// Copyright (C) 2018, Hao Hou
2
3//! The Plumber Pipe IO API wrapper
4//!
5//! This module is the wrapper to the actual Plumber pipe API calls for Pipe IO
6
7use ::plumber_api::{runtime_api_pipe_t, runtime_api_pipe_flags_t};
8use ::plumber_api_call::get_cstr;
9
10use std::io::{Read, Write, Result, Error, ErrorKind};
11use std::os::raw::c_void;
12
13use std::io::BufReader;
14
15/**
16 * The integer type for the Plumber pipe flags
17 **/
18pub type PipeFlags = runtime_api_pipe_flags_t;
19
20/**
21 * The integer type used to represent a reference to the Pipe port
22 **/
23pub type PipeDescriptor = runtime_api_pipe_t;
24
25// TODO: Currently because of the limit of rust-bindgen, all the constant marcos with non-primitive
26//       type is missing in the bind file. So we have do define it manually
27
28/**
29 * Indicates the pipe port is an input side
30 **/
31pub const PIPE_INPUT    :PipeFlags   = 0;
32/**
33 * Indictes the pipe port is an output  side
34 **/
35pub const PIPE_OUTPUT   :PipeFlags   = 0x10000;
36/**
37 * If this flag is set it suggest the Plumber framework to keep the communication resource for more
38 * event even after current resource has been processed
39 **/
40pub const PIPE_PERSIST  :PipeFlags   = 0x20000;
41/**
42 * If this flag is set, it suggest Plumber framework use the async write thread if possible. This
43 * is typically useful when we want to write a large file
44 **/
45pub const PIPE_ASYNC    :PipeFlags   = 0x40000;
46/**
47 * This flag makes the output pipe a copy of input pipe. This is also called a fork, which split
48 * the dataflow into multiple ways.
49 **/
50pub const PIPE_SHADOW   :PipeFlags   = 0x80000;
51/**
52 * The pipe is diable, which is only meaningful when the pipe is a fork of another pipe. It
53 * indicates do not forward data to this fork
54 **/
55pub const PIPE_DISABLED :PipeFlags   = 0x100000;
56
57const PIPE_CNTL_GET_FLAGS:u32        = ::plumber_api::RUNTIME_API_PIPE_CNTL_OPCODE_GET_FLAGS;
58const PIPE_CNTL_SET_FLAG:u32         = ::plumber_api::RUNTIME_API_PIPE_CNTL_OPCODE_SET_FLAG;
59const PIPE_CNTL_CLR_FLAG:u32         = ::plumber_api::RUNTIME_API_PIPE_CNTL_OPCODE_CLR_FLAG;
60const PIPE_CNTL_PUSH_STATE:u32       = ::plumber_api::RUNTIME_API_PIPE_CNTL_OPCODE_PUSH_STATE;
61const PIPE_CNTL_POP_STATE:u32        = ::plumber_api::RUNTIME_API_PIPE_CNTL_OPCODE_POP_STATE;
62
63
64struct PipeCntlData {
65    pipe  : runtime_api_pipe_t,
66    opcode: u32,
67    result: i32
68}
69
70extern "C" fn invoke_pipe_cntl(ap:*mut ::va_list_helper::__va_list_tag, data_ptr:*mut c_void)
71{
72    if let Some(data) = unsafe { (data_ptr as *mut PipeCntlData).as_mut() }
73    {
74        plumber_api_call! {
75            let result = cntl(data.pipe, data.opcode, ap as *mut ::plumber_api::__va_list_tag) in 
76            {
77                data.result = result;
78            }
79        }
80    }
81}
82
83macro_rules! pipe_cntl {
84    ($pipe:expr, $opcode:expr, $($args:expr),*) => {
85        if let Some(ref va_helper) = unsafe{::VA_LIST_HELPER}
86        {
87            let mut pipe_cntl_data = PipeCntlData {
88                pipe  : $pipe,
89                opcode: $opcode,
90                result: -1
91            };
92            let mut data_ptr = &mut pipe_cntl_data as *mut PipeCntlData;
93            unsafe{ va_helper(Some(invoke_pipe_cntl), data_ptr as *mut c_void, $($args),*) }
94            pipe_cntl_data.result
95        }
96        else 
97        {
98            -1
99        }
100    }
101}
102
103/**
104 * The Rust wrapper of a Plumber pipe port. 
105 *
106 * In Plumber, we use a integer as identifer of the pipe
107 * port when we write the servlet. This is called `pipe_t` in the C API. However, In rust, we
108 * implemented the Pipe object which allows us directly read and write the pipe with the object.
109 *
110 * * `ST`: The type of the state. This is only used when we want to implement a stateful port
111 **/
112#[allow(dead_code)]
113pub struct Pipe<ST> {
114    /// The actual pipe descriptor
115    pipe : runtime_api_pipe_t,
116    /// The phantom data
117    _st  : ::std::marker::PhantomData<ST>
118}
119
120
121/**
122 * A reference to a given pipe port.
123 *
124 * We need this type because the `std::io::bufreader` requires us to give out the ownership of the
125 * inner object to the bufreader. However a pipe port object should be used for each servlet
126 * activation, so we basically can not give it out. So we implement this reference type, so that we
127 * can give the ownership of this object to bufreader without destory the orignal pipe port object.
128 **/
129pub struct PipeRef {
130    /// The target pipe descriptor
131    pipe : runtime_api_pipe_t
132}
133
134impl Read for PipeRef {
135    fn read(&mut self, buf : &mut [u8]) -> Result<usize>
136    {
137        plumber_api_call!{
138            let result = read(self.pipe, buf.as_mut_ptr() as *mut c_void, buf.len()) in {
139                if result as isize != -1
140                {
141                    return Ok(result as usize);
142                }
143                return Err(Error::new(ErrorKind::NotFound, "Plumber pipe_read API returns an error"));
144            }
145        }
146        return Err(Error::new(ErrorKind::Other, "Plumber guest code runtime doesn't fully initailized"));
147    }
148}
149
150impl <ST> Pipe<ST> {
151
152    /**
153     * Get a `std::io::BufReader` object from current pipe port.
154     *
155     * This is useful when we want to do text IO to the pipe
156     *
157     * Returns the ownership of the newly created reader
158     **/
159    pub fn as_bufreader(&self) -> BufReader<PipeRef>
160    {
161        return BufReader::new(PipeRef {
162            pipe : self.pipe
163        });
164    }
165
166    /**
167     * Get the actual pipe descriptor managed by this pipe object
168     *
169     * Return the pipe descriptor
170     **/
171    pub fn as_descriptor(&self) -> PipeDescriptor 
172    {
173        return self.pipe.clone();
174    }
175
176    /**
177     * Define a new pipe port for the current servlet.
178     *
179     * This function creates the pipe port in Rust as well as Plumber framework. Since Plumber only
180     * allows pipe port declaration during the initialization stage, so if this function is called
181     * from execution or cleanup stage, the result will be a failure.
182     *
183     * * `name` The name of the port. It will be used for the dataflow graph construction
184     * * `flags` The initial pipe flag of this pipe. 
185     * * `type_expr` The type expression for the protocol of this pipe port. See Plumber's protocol
186     * typing documentations for detail.
187     *
188     * Returns either `None` on creating failure or `Some` of ownership of the newly created pipe
189     * object
190     **/
191    pub fn define(name:&str, flags: PipeFlags, type_expr:Option<&str>) -> Option<Pipe<ST>>
192    {
193        let (name_ptr, _name) = get_cstr(Some(name));
194        let (type_ptr, _type) = get_cstr(type_expr);
195
196        plumber_api_call!{
197            let result = define(name_ptr, flags, type_ptr) in {
198                if result as i32 != -1
199                {
200                    return Some(Pipe{pipe : result, _st : ::std::marker::PhantomData});
201                }
202            }
203        };
204
205        return None;
206    }
207
208    /**
209     * Check if the pipe contains no more data. 
210     *
211     * This is meaningful only when we are currently executing some execution task with this servlet. 
212     * Which means it only can be called from either `exec` and `async_init`, `async_cleanup` stage
213     * of a servlet. Otherwise it will returns a failure.
214     *
215     * The EOF function in Plumber defines a little bit different from normal EOF. It indicates if
216     * it's possible to have further data.
217     *
218     * If this function returns `true`, it's possible we have more data in the furture, but it's **not** 
219     * means we current have data to read. It's also possible that there's no more data but the
220     * framework is not able to realize that currently. 
221     *
222     * If this function returns `false`, it indicates there are definitely no more data can be read
223     * from this port. 
224     *
225     * Returns either None on error case or the check result
226     **/
227    pub fn eof(&mut self) -> Option<bool>
228    {
229        plumber_api_call!{
230            let result = eof(self.pipe) in {
231                if result as i32 != -1
232                {
233                    return Some(result > 0);
234                }
235                return None;
236            }
237        }
238
239        return None;
240    }
241
242    /**
243     * Get the runtime flags of this port. 
244     *
245     * Since Plumber allows the pipe flag to be changed inside the execution stage. So this
246     * function is used to check what is the current pipe flags.
247     *
248     * Return either None on error or the current pipe flag
249     **/
250    pub fn flags(&mut self) -> Option<PipeFlags> 
251    {
252        let mut pf = 0 as PipeFlags;
253        let pf_ref = &mut pf as *mut PipeFlags;
254
255        if -1 != pipe_cntl!(self.pipe, PIPE_CNTL_GET_FLAGS, pf_ref as *mut c_void)
256        {
257            return Some(pf);
258        }
259        return None;
260    } 
261
262    /**
263     * Test if the pipe port has the required pipe flags been set.
264     *
265     * Returns either None on error or the current pipe flag
266     **/
267    pub fn check_flag(&mut self, flag:PipeFlags) -> Option<bool>
268    {
269        if let Some(result) = self.flags()
270        {
271            return Some((result & flag) == flag);
272        }
273        return None;
274    }
275
276    /**
277     * Set the runtime flags of the pipe port 
278     *
279     * * `flag` The pipe flag we want to add to the pipe
280     *
281     * Return the operation result `None` indicates failure, `Some` Indicates success
282     **/
283    pub fn set_flags(&mut self, flag:PipeFlags) -> Option<()>
284    {
285        if -1 != pipe_cntl!(self.pipe, PIPE_CNTL_SET_FLAG, flag)
286        {
287            return Some(());
288        }
289        return None;
290    }
291
292    /**
293     * Unset the runtime flags for a pipe port
294     *
295     * * `flag` The pipe flag we want to unset
296     *
297     * Return the operation result `None` indicates failure, `Some` for success
298     **/
299    pub fn clear_flags(&mut self, flag:PipeFlags) -> Option<()>
300    {
301        if -1 != pipe_cntl!(self.pipe, PIPE_CNTL_CLR_FLAG, flag)
302        {
303            return Some(());
304        }
305        return None;
306    }
307
308    extern "C" fn dispose_state(ptr : *mut c_void) -> i32
309    {
310        unsafe { Box::from_raw(ptr as *mut ST) };
311        return 0;
312    }
313
314    /**
315     * Get the associated state for current pipe resource.
316     *
317     * Plumber allows stateful pipe port, which means in the execution state, the servlet can
318     * attach a state object with the pipe resource. After the state is attached, and
319     * `PIPE_PERSIST` flag is set, the framework will manage the state for the servlet. 
320     *
321     * When the servlet is active again due to the same communication resource, the object can be
322     * retrieved.
323     *
324     * Returns the retrieved reference to the Obect.
325     *
326     * Note: Plumber framework always manage the ownership of the pushed state objects. So in this
327     * function only a reference will be returned. All the memory management is done by Plumber
328     * rather than Rust.
329     *
330     **/
331    pub fn get_state<'a>(&mut self) -> Option<&'a ST>
332    {
333        let state_ptr = ::std::ptr::null::<ST>() as *mut ST;
334
335        let state_ptr_ref = &state_ptr;
336
337        if -1 != pipe_cntl!(self.pipe, PIPE_CNTL_POP_STATE, state_ptr_ref as *const *mut ST)
338        {
339            if let Some(state) = unsafe{ state_ptr.as_ref() }
340            {
341                return Some(state);
342            }
343        }
344        return None;
345    }
346
347    /**
348     * Push the state object to the pipe. This will attach the state to the pipe communication
349     * resources. 
350     *
351     * See the documentation of `get_state` for more detailed description of state mechanism.
352     *
353     * * `obj`: The box that contains the ownership of the state we want to push
354     *
355     * Return The operation result.
356     *
357     * Note: This function always takes the ownership of the state object, even if it returns a
358     * failure. 
359     **/
360    pub fn push_state(&mut self, obj : Box<ST>) -> Option<()>
361    {
362        let dispose_func_ptr = Self::dispose_state as *const c_void;
363
364        let box_ref = Box::leak(obj);
365
366        let box_ptr = box_ref as *mut ST;
367
368        let void_ptr = box_ptr as *mut c_void;
369
370
371        if -1 != pipe_cntl!(self.pipe, PIPE_CNTL_PUSH_STATE, void_ptr, dispose_func_ptr)
372        {
373            return Some(());
374        }
375        
376        Self::dispose_state(void_ptr);
377
378        return None;
379    }
380
381}
382
383impl <ST> Read for Pipe<ST> {
384    fn read(&mut self, buf: &mut [u8]) -> Result<usize>
385    {
386        plumber_api_call!{
387            let result = read(self.pipe, buf.as_mut_ptr() as *mut c_void, buf.len()) in {
388                if result as isize != -1
389                {
390                    return Ok(result as usize);
391                }
392                return Err(Error::new(ErrorKind::NotFound, "Plumber pipe_read API returns an error"));
393            }
394        }
395        return Err(Error::new(ErrorKind::Other, "Plumber guest code runtime doesn't fully initailized"));
396    }
397}
398
399impl <ST> Write for Pipe<ST> {
400    fn write(&mut self, buf:&[u8]) -> Result<usize>
401    {
402        plumber_api_call!{
403            let result = write(self.pipe, buf.as_ptr() as *mut c_void, buf.len()) in {
404                if result as isize != -1
405                {
406                    return Ok(result as usize);
407                }
408                return Err(Error::new(ErrorKind::NotFound, "Plumber pipe_write API returns an error"));
409            }
410        }
411        return Err(Error::new(ErrorKind::Other, "Plumber guest code runtime doesn't fully initailized"));
412    }
413
414    fn flush(&mut self) -> Result<()>
415    {
416        return Ok(());
417    }
418}