ntex_io/
utils.rs

1use ntex_service::{chain_factory, fn_service, ServiceFactory};
2use ntex_util::future::Ready;
3
4use crate::{Filter, Io, IoBoxed};
5
6/// Decoded item from buffer
7#[doc(hidden)]
8#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
9pub struct Decoded<T> {
10    pub item: Option<T>,
11    pub remains: usize,
12    pub consumed: usize,
13}
14
15/// Service that converts any `Io<F>` stream to IoBoxed stream
16pub fn seal<F, S, C>(
17    srv: S,
18) -> impl ServiceFactory<
19    Io<F>,
20    C,
21    Response = S::Response,
22    Error = S::Error,
23    InitError = S::InitError,
24>
25where
26    F: Filter,
27    S: ServiceFactory<IoBoxed, C>,
28    C: Clone,
29{
30    chain_factory(fn_service(|io: Io<F>| Ready::Ok(io.boxed())))
31        .map_init_err(|_| panic!())
32        .and_then(srv)
33}
34
35#[cfg(test)]
36mod tests {
37    use ntex_bytes::Bytes;
38    use ntex_codec::BytesCodec;
39
40    use super::*;
41    use crate::{buf::Stack, filter::NullFilter, testing::IoTest, FilterCtx};
42
43    #[ntex::test]
44    async fn test_utils() {
45        let (client, server) = IoTest::create();
46        client.remote_buffer_cap(1024);
47        client.write("REQ");
48
49        let svc = seal(fn_service(|io: IoBoxed| async move {
50            let t = io.recv(&BytesCodec).await.unwrap().unwrap();
51            assert_eq!(t, b"REQ".as_ref());
52            io.send(Bytes::from_static(b"RES"), &BytesCodec)
53                .await
54                .unwrap();
55            Ok::<_, ()>(())
56        }))
57        .pipeline(())
58        .await
59        .unwrap();
60        let _ = svc.call(Io::new(server)).await;
61
62        let buf = client.read().await.unwrap();
63        assert_eq!(buf, b"RES".as_ref());
64    }
65
66    #[ntex::test]
67    async fn test_null_filter() {
68        let (_, server) = IoTest::create();
69        let io = Io::new(server);
70        let ioref = io.get_ref();
71        let stack = Stack::new();
72        assert!(NullFilter.query(std::any::TypeId::of::<()>()).is_none());
73        assert!(NullFilter
74            .shutdown(FilterCtx::new(&ioref, &stack))
75            .unwrap()
76            .is_ready());
77        assert_eq!(
78            std::future::poll_fn(|cx| NullFilter.poll_read_ready(cx)).await,
79            crate::Readiness::Terminate
80        );
81        assert_eq!(
82            std::future::poll_fn(|cx| NullFilter.poll_write_ready(cx)).await,
83            crate::Readiness::Terminate
84        );
85        assert!(NullFilter
86            .process_write_buf(FilterCtx::new(&ioref, &stack))
87            .is_ok());
88        assert_eq!(
89            NullFilter
90                .process_read_buf(FilterCtx::new(&ioref, &stack), 0)
91                .unwrap(),
92            Default::default()
93        )
94    }
95}