asyncio/streams/mod.rs
1use core::AsIoContext;
2use async::Handler;
3
4pub trait Stream<E> : AsIoContext + Send + 'static {
5 fn async_read_some<F>(&self, buf: &mut [u8], handler: F) -> F::Output
6 where F: Handler<usize, E>;
7
8 fn async_write_some<F>(&self, buf: &[u8], handler: F) -> F::Output
9 where F: Handler<usize, E>;
10
11 fn read_some(&self, buf: &mut [u8]) -> Result<usize, E>;
12
13 fn write_some(&self, buf: &[u8]) -> Result<usize, E>;
14}
15
16mod match_cond;
17pub use self::match_cond::*;
18
19// pub fn read_until<S, M, E>(s: &S, sbuf: &mut StreamBuf, mut cond: M) -> Result<usize, E>
20// where S: Stream<E>,
21// M: MatchCondition,
22// E: From<io::Error>,
23// {
24// let mut cur = 0;
25// loop {
26// match cond.match_cond(&sbuf.as_slice()[cur..]) {
27// Ok(len) => return Ok(cur + len),
28// Err(len) => {
29// cur += len;
30// let len = try!(s.read_some(try!(sbuf.prepare(4096))));
31// sbuf.commit(len);
32// },
33// }
34// }
35// }
36
37// struct ReadUntilHandler<S, M, F, E> {
38// s: UnsafeRefCell<S>,
39// sbuf: UnsafeRefCell<StreamBuf>,
40// cond: M,
41// handler: F,
42// cur: usize,
43// _marker: PhantomData<E>,
44// }
45
46// impl<S, M, F, E> Handler<usize, E> for ReadUntilHandler<S, M, F, E>
47// where S: Stream<E>,
48// M: MatchCondition,
49// F: Handler<usize, E>,
50// E: From<io::Error> + Send + 'static,
51// {
52// type Output = F::Output;
53
54// fn callback(self, io: &IoContext, res: Result<usize, E>) {
55// let ReadUntilHandler { s, mut sbuf, cond, handler, cur, _marker } = self;
56// let s = unsafe { s.as_ref() };
57// match res {
58// Ok(len) => {
59// let sbuf = unsafe { sbuf.as_mut() };
60// sbuf.commit(len);
61// async_read_until_detail(s, sbuf, cond, handler, cur);
62// },
63// Err(err) => handler.callback(io, Err(err)),
64// }
65// }
66
67// fn wrap<G>(self, callback: G) -> Callback
68// where G: FnOnce(&IoContext, ErrCode, Self) + Send + 'static,
69// {
70// let ReadUntilHandler { s, sbuf, cond, handler, cur, _marker } = self;
71// handler.wrap(move |io, ec, handler| {
72// callback(io, ec, ReadUntilHandler {
73// s: s,
74// sbuf: sbuf,
75// cond: cond,
76// handler: handler,
77// cur: cur,
78// _marker: PhantomData,
79// })
80// })
81// }
82
83// type AsyncResult = F::AsyncResult;
84
85// fn async_result(&self) -> Self::AsyncResult {
86// self.handler.async_result()
87// }
88// }
89
90// fn async_read_until_detail<S, M, F, E>(s: &S, sbuf: &mut StreamBuf, mut cond: M, handler: F, mut cur: usize) -> F::Output
91// where S: Stream<E>,
92// M: MatchCondition,
93// F: Handler<usize, E>,
94// E: From<io::Error> + Send + 'static,
95// {
96// let ctx = s.context();
97// let out = handler.async_result();
98// match cond.match_cond(&sbuf.as_slice()[cur..]) {
99// Ok(len) => handler.callback(ctx, Ok(cur + len)),
100// Err(len) => {
101// cur += len;
102// let sbuf_ptr = UnsafeRefCell::new(sbuf);
103// match sbuf.prepare(4096) {
104// Ok(buf) => {
105// let handler = ReadUntilHandler {
106// s: UnsafeRefCell::new(s),
107// sbuf: sbuf_ptr,
108// cond: cond,
109// handler: handler,
110// cur: cur,
111// _marker: PhantomData,
112// };
113// s.async_read_some(buf, handler);
114// },
115// Err(err) => handler.callback(ctx, Err(err.into())),
116// }
117// }
118// }
119// out.get(ctx)
120// }
121
122// pub fn async_read_until<S, M, F, E>(s: &S, sbuf: &mut StreamBuf, cond: M, handler: F) -> F::Output
123// where S: Stream<E>,
124// M: MatchCondition,
125// F: Handler<usize, E>,
126// E: From<io::Error> + Send + 'static,
127// {
128// async_read_until_detail(s, sbuf, cond, handler, 0)
129// }
130
131// pub fn write_until<S, M, E>(s: &S, sbuf: &mut StreamBuf, mut cond: M) -> Result<usize, E>
132// where S: Stream<E>,
133// M: MatchCondition,
134// {
135// let len = {
136// let len = match cond.match_cond(sbuf.as_slice()) {
137// Ok(len) => len,
138// Err(len) => len,
139// };
140// try!(s.write_some(&sbuf.as_slice()[..len]))
141// };
142// sbuf.consume(len);
143// Ok(len)
144// }
145
146// struct WriteUntilHandler<S, F, E> {
147// s: UnsafeRefCell<S>,
148// sbuf: UnsafeRefCell<StreamBuf>,
149// handler: F,
150// total: usize,
151// cur: usize,
152// _marker: PhantomData<E>,
153// }
154
155// impl<S, F, E> Handler<usize, E> for WriteUntilHandler<S, F, E>
156// where S: Stream<E>,
157// F: Handler<usize, E>,
158// E: Send + 'static,
159// {
160// type Output = F::Output;
161
162// fn callback(self, io: &IoContext, res: Result<usize, E>) {
163// let WriteUntilHandler { s, mut sbuf, handler, total, mut cur, _marker } = self;
164// let s = unsafe { s.as_ref() };
165// match res {
166// Ok(len) => {
167// let sbuf = unsafe { sbuf.as_mut() };
168// sbuf.consume(len);
169// cur -= len;
170// if cur == 0 {
171// handler.callback(io, Ok(total))
172// } else {
173// async_write_until_detail(s, sbuf, len, handler, cur);
174// }
175// },
176// Err(err) => handler.callback(io, Err(err)),
177// }
178// }
179
180// fn wrap<G>(self, callback: G) -> Callback
181// where G: FnOnce(&IoContext, ErrCode, Self) + Send + 'static,
182// {
183// let WriteUntilHandler { s, sbuf, handler, total, cur, _marker } = self;
184// handler.wrap(move |io, ec, handler| {
185// callback(io, ec, WriteUntilHandler {
186// s: s,
187// sbuf: sbuf,
188// handler: handler,
189// total: total,
190// cur: cur,
191// _marker: _marker,
192// })
193// })
194// }
195
196// type AsyncResult = F::AsyncResult;
197
198// fn async_result(&self) -> Self::AsyncResult {
199// self.handler.async_result()
200// }
201// }
202
203// fn async_write_until_detail<S, F, E>(s: &S, sbuf: &mut StreamBuf, total: usize, handler: F, cur: usize) -> F::Output
204// where S: Stream<E>,
205// F: Handler<usize, E>,
206// E: Send + 'static,
207// {
208// let handler = WriteUntilHandler {
209// s: UnsafeRefCell::new(s),
210// sbuf: UnsafeRefCell::new(sbuf),
211// handler: handler,
212// total: total,
213// cur: cur,
214// _marker: PhantomData,
215// };
216// s.async_write_some(&sbuf.as_slice()[..cur], handler)
217// }
218
219// pub fn async_write_until<S, M, F, E>(s: &S, sbuf: &mut StreamBuf, mut cond: M, handler: F) -> F::Output
220// where S: Stream<E>,
221// M: MatchCondition,
222// F: Handler<usize, E>,
223// E: From<io::Error> + Send + 'static,
224// {
225// let total = match cond.match_cond(sbuf.as_slice()) {
226// Ok(len) => len,
227// Err(len) => len,
228// };
229// async_write_until_detail(s, sbuf, total, handler, total)
230// }