asyncio/streams/
mod.rs

1use core::AsIoContext;
2use async::Handler;
3
4pub trait Stream<E> : AsIoContext + Send + 'static {
5    fn async_read_some<F>(&self, buf: &mut [u8], handler: F) -> F::Output
6        where F: Handler<usize, E>;
7
8    fn async_write_some<F>(&self, buf: &[u8], handler: F) -> F::Output
9        where F: Handler<usize, E>;
10
11    fn read_some(&self, buf: &mut [u8]) -> Result<usize, E>;
12
13    fn write_some(&self, buf: &[u8]) -> Result<usize, E>;
14}
15
16mod match_cond;
17pub use self::match_cond::*;
18
19// pub fn read_until<S, M, E>(s: &S, sbuf: &mut StreamBuf, mut cond: M) -> Result<usize, E>
20//     where S: Stream<E>,
21//           M: MatchCondition,
22//           E: From<io::Error>,
23// {
24//     let mut cur = 0;
25//     loop {
26//         match cond.match_cond(&sbuf.as_slice()[cur..]) {
27//             Ok(len) => return Ok(cur + len),
28//             Err(len) => {
29//                 cur += len;
30//                 let len = try!(s.read_some(try!(sbuf.prepare(4096))));
31//                 sbuf.commit(len);
32//             },
33//         }
34//     }
35// }
36
37// struct ReadUntilHandler<S, M, F, E> {
38//     s: UnsafeRefCell<S>,
39//     sbuf: UnsafeRefCell<StreamBuf>,
40//     cond: M,
41//     handler: F,
42//     cur: usize,
43//     _marker: PhantomData<E>,
44// }
45
46// impl<S, M, F, E> Handler<usize, E> for ReadUntilHandler<S, M, F, E>
47//     where S: Stream<E>,
48//           M: MatchCondition,
49//           F: Handler<usize, E>,
50//           E: From<io::Error> + Send + 'static,
51// {
52//     type Output = F::Output;
53
54//     fn callback(self, io: &IoContext, res: Result<usize, E>) {
55//         let ReadUntilHandler { s, mut sbuf, cond, handler, cur, _marker } = self;
56//         let s = unsafe { s.as_ref() };
57//         match res {
58//             Ok(len) => {
59//                 let sbuf = unsafe { sbuf.as_mut() };
60//                 sbuf.commit(len);
61//                 async_read_until_detail(s, sbuf, cond, handler, cur);
62//             },
63//             Err(err) => handler.callback(io, Err(err)),
64//         }
65//     }
66
67//     fn wrap<G>(self, callback: G) -> Callback
68//         where G: FnOnce(&IoContext, ErrCode, Self) + Send + 'static,
69//     {
70//         let ReadUntilHandler { s, sbuf, cond, handler, cur, _marker } = self;
71//         handler.wrap(move |io, ec, handler| {
72//             callback(io, ec, ReadUntilHandler {
73//                 s: s,
74//                 sbuf: sbuf,
75//                 cond: cond,
76//                 handler: handler,
77//                 cur: cur,
78//                 _marker: PhantomData,
79//             })
80//         })
81//     }
82
83//     type AsyncResult = F::AsyncResult;
84
85//     fn async_result(&self) -> Self::AsyncResult {
86//         self.handler.async_result()
87//     }
88// }
89
90// fn async_read_until_detail<S, M, F, E>(s: &S, sbuf: &mut StreamBuf, mut cond: M, handler: F, mut cur: usize) -> F::Output
91//     where S: Stream<E>,
92//           M: MatchCondition,
93//           F: Handler<usize, E>,
94//           E: From<io::Error> + Send + 'static,
95// {
96//     let ctx = s.context();
97//     let out = handler.async_result();
98//     match cond.match_cond(&sbuf.as_slice()[cur..]) {
99//         Ok(len) => handler.callback(ctx, Ok(cur + len)),
100//         Err(len) => {
101//             cur += len;
102//             let sbuf_ptr = UnsafeRefCell::new(sbuf);
103//             match sbuf.prepare(4096) {
104//                 Ok(buf) => {
105//                     let handler = ReadUntilHandler {
106//                         s: UnsafeRefCell::new(s),
107//                         sbuf: sbuf_ptr,
108//                         cond: cond,
109//                         handler: handler,
110//                         cur: cur,
111//                         _marker: PhantomData,
112//                     };
113//                     s.async_read_some(buf, handler);
114//                 },
115//                 Err(err) => handler.callback(ctx, Err(err.into())),
116//             }
117//         }
118//     }
119//     out.get(ctx)
120// }
121
122// pub fn async_read_until<S, M, F, E>(s: &S, sbuf: &mut StreamBuf, cond: M, handler: F) -> F::Output
123//     where S: Stream<E>,
124//           M: MatchCondition,
125//           F: Handler<usize, E>,
126//           E: From<io::Error> + Send + 'static,
127// {
128//     async_read_until_detail(s, sbuf, cond, handler, 0)
129// }
130
131// pub fn write_until<S, M, E>(s: &S, sbuf: &mut StreamBuf, mut cond: M) -> Result<usize, E>
132//     where S: Stream<E>,
133//           M: MatchCondition,
134// {
135//     let len = {
136//         let len = match cond.match_cond(sbuf.as_slice()) {
137//             Ok(len) => len,
138//             Err(len) => len,
139//         };
140//         try!(s.write_some(&sbuf.as_slice()[..len]))
141//     };
142//     sbuf.consume(len);
143//     Ok(len)
144// }
145
146// struct WriteUntilHandler<S, F, E> {
147//     s: UnsafeRefCell<S>,
148//     sbuf: UnsafeRefCell<StreamBuf>,
149//     handler: F,
150//     total: usize,
151//     cur: usize,
152//     _marker: PhantomData<E>,
153// }
154
155// impl<S, F, E> Handler<usize, E> for WriteUntilHandler<S, F, E>
156//     where S: Stream<E>,
157//           F: Handler<usize, E>,
158//           E: Send + 'static,
159// {
160//     type Output = F::Output;
161
162//     fn callback(self, io: &IoContext, res: Result<usize, E>) {
163//         let WriteUntilHandler { s, mut sbuf, handler, total, mut cur, _marker } = self;
164//         let s = unsafe { s.as_ref() };
165//         match res {
166//             Ok(len) => {
167//                 let sbuf = unsafe { sbuf.as_mut() };
168//                 sbuf.consume(len);
169//                 cur -= len;
170//                 if cur == 0 {
171//                     handler.callback(io, Ok(total))
172//                 } else {
173//                     async_write_until_detail(s, sbuf, len, handler, cur);
174//                 }
175//             },
176//             Err(err) => handler.callback(io, Err(err)),
177//         }
178//     }
179
180//     fn wrap<G>(self, callback: G) -> Callback
181//         where G: FnOnce(&IoContext, ErrCode, Self) + Send + 'static,
182//     {
183//         let WriteUntilHandler { s, sbuf, handler, total, cur, _marker } = self;
184//         handler.wrap(move |io, ec, handler| {
185//             callback(io, ec, WriteUntilHandler {
186//                 s: s,
187//                 sbuf: sbuf,
188//                 handler: handler,
189//                 total: total,
190//                 cur: cur,
191//                 _marker: _marker,
192//             })
193//         })
194//     }
195
196//     type AsyncResult = F::AsyncResult;
197
198//     fn async_result(&self) -> Self::AsyncResult {
199//         self.handler.async_result()
200//     }
201// }
202
203// fn async_write_until_detail<S, F, E>(s: &S, sbuf: &mut StreamBuf, total: usize, handler: F, cur: usize) -> F::Output
204//     where S: Stream<E>,
205//           F: Handler<usize, E>,
206//           E: Send + 'static,
207// {
208//     let handler = WriteUntilHandler {
209//         s: UnsafeRefCell::new(s),
210//         sbuf: UnsafeRefCell::new(sbuf),
211//         handler: handler,
212//         total: total,
213//         cur: cur,
214//         _marker: PhantomData,
215//     };
216//     s.async_write_some(&sbuf.as_slice()[..cur], handler)
217// }
218
219// pub fn async_write_until<S, M, F, E>(s: &S, sbuf: &mut StreamBuf, mut cond: M, handler: F) -> F::Output
220//     where S: Stream<E>,
221//           M: MatchCondition,
222//           F: Handler<usize, E>,
223//           E: From<io::Error> + Send + 'static,
224// {
225//     let total = match cond.match_cond(sbuf.as_slice()) {
226//         Ok(len) => len,
227//         Err(len) => len,
228//     };
229//     async_write_until_detail(s, sbuf, total, handler, total)
230// }