Skip to main content

lustre_executor/
io.rs

1use crate::reactor::Reactor;
2use futures::future::Future;
3use mio::Interest;
4use std::io;
5use std::task::{Context, Poll as TaskPoll};
6use std::time::{Duration, Instant};
7
8pub struct TcpListenerFuture {
9    listener: mio::net::TcpListener,
10    token: Option<mio::Token>,
11    reactor: *mut Reactor,
12}
13
14impl TcpListenerFuture {
15    pub fn new(addr: std::net::SocketAddr, reactor: &mut Reactor) -> io::Result<Self> {
16        let listener = mio::net::TcpListener::bind(addr)?;
17        Ok(Self {
18            listener,
19            token: None,
20            reactor: reactor as *mut _,
21        })
22    }
23}
24
25impl Future for TcpListenerFuture {
26    type Output = mio::net::TcpStream;
27
28    fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context) -> TaskPoll<Self::Output> {
29        unsafe {
30            let reactor = &mut *self.reactor;
31            if self.token.is_none() {
32                self.token = Some(reactor.register(
33                    &mut self.listener,
34                    Interest::READABLE,
35                    cx.waker().clone(),
36                ));
37            }
38            match self.listener.accept() {
39                Ok((stream, _)) => TaskPoll::Ready(stream),
40                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => TaskPoll::Pending,
41                Err(e) => panic!("Accept error: {}", e),
42            }
43        }
44    }
45}
46
47pub struct TimerFuture {
48    deadline: Instant,
49    token: Option<mio::Token>,
50    reactor: *mut Reactor,
51}
52
53impl TimerFuture {
54    pub fn new(duration: Duration, reactor: &mut Reactor) -> Self {
55        Self {
56            deadline: Instant::now() + duration,
57            token: None,
58            reactor: reactor as *mut _,
59        }
60    }
61}
62
63impl Future for TimerFuture {
64    type Output = ();
65
66    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context) -> TaskPoll<Self::Output> {
67        if Instant::now() >= self.deadline {
68            TaskPoll::Ready(())
69        } else {
70            // For simplicity, wake immediately; in a real impl, use mio timer or reactor integration
71            cx.waker().wake_by_ref();
72            TaskPoll::Pending
73        }
74    }
75}