termion_input_tokio/
lib.rs1use std::{future, io, pin::Pin};
2use std::{
3 iter::empty,
4 task::{Context, Poll},
5};
6
7use bytes::{Buf, Bytes, BytesMut};
8use futures::{Stream, StreamExt, stream::BoxStream};
9use termion::event::{self, Event, Key};
10use tokio::io::AsyncRead;
11use tokio_util::codec::{Decoder, FramedRead};
12
13type EventsAndRawStream<R> = FramedRead<R, EventsAndRawDecoder>;
15
16pub struct EventsAndRawDecoder;
17
18impl Decoder for EventsAndRawDecoder {
19 type Item = (Event, Vec<u8>);
20 type Error = io::Error;
21
22 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
23 match src.len() {
24 0 => Ok(None),
25 1 => match src[0] {
26 b'\x1B' => {
27 src.advance(1);
28 Ok(Some((Event::Key(Key::Esc), vec![b'\x1B'])))
29 }
30 c => {
31 if let Ok(res) = parse_event(c, &mut empty()) {
32 src.advance(1);
33 Ok(Some(res))
34 } else {
35 Ok(None)
36 }
37 }
38 },
39 _ => {
40 let (off, res) = if let Some((c, cs)) = src.split_first() {
41 let cur = Bytes::copy_from_slice(cs);
42 let mut it = cur.into_iter().map(Ok);
43 if let Ok(res) = parse_event(*c, &mut it) {
44 (1 + cs.len() - it.len(), Ok(Some(res)))
45 } else {
46 (0, Ok(None))
47 }
48 } else {
49 (0, Ok(None))
50 };
51
52 src.advance(off);
53 res
54 }
55 }
56 }
57}
58
59fn parse_event<I>(item: u8, iter: &mut I) -> Result<(Event, Vec<u8>), io::Error>
60where
61 I: Iterator<Item = Result<u8, io::Error>>,
62{
63 let mut buf = vec![item];
64 let result = {
65 let mut iter = iter.inspect(|byte| {
66 if let &Ok(byte) = byte {
67 buf.push(byte);
68 }
69 });
70 event::parse_event(item, &mut iter)
71 };
72 result
73 .or(Ok(Event::Unsupported(buf.clone())))
74 .map(|e| (e, buf))
75}
76
77pub struct InputStream<T>(BoxStream<'static, Result<T, io::Error>>);
79
80impl<T> InputStream<T> {
81 fn new<S: Stream<Item = Result<T, io::Error>> + Send + 'static>(stream: S) -> Self {
82 InputStream(Box::pin(stream))
83 }
84}
85
86impl<T> Stream for InputStream<T> {
87 type Item = Result<T, io::Error>;
88
89 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
90 self.0.poll_next_unpin(cx)
91 }
92}
93
94pub trait TermReadAsync: Sized {
96 fn events_stream(self) -> InputStream<Event>;
98
99 fn keys_stream(self) -> InputStream<Key>;
101
102 fn events_and_raw_stream(self) -> EventsAndRawStream<Self>
104 where
105 Self: Sized;
106}
107
108impl<R: 'static + Send + AsyncRead> TermReadAsync for R {
109 fn events_stream(self) -> InputStream<Event> {
110 InputStream::new(
111 self.events_and_raw_stream()
112 .map(|event_and_raw| match event_and_raw {
113 Ok((event, _raw)) => Ok(event),
114 Err(e) => Err(e),
115 }),
116 )
117 }
118
119 fn keys_stream(self) -> InputStream<Key> {
120 InputStream::new(self.events_stream().filter_map(|event| {
121 future::ready(match event {
122 Ok(Event::Key(k)) => Some(Ok(k)),
123 Ok(_) => None,
124 Err(e) => Some(Err(e)),
125 })
126 }))
127 }
128
129 fn events_and_raw_stream(self) -> EventsAndRawStream<Self> {
130 FramedRead::new(self, EventsAndRawDecoder)
131 }
132}