mtcp_rs/
manager.rs

1/*
2 * mtcp - TcpListener/TcpStream *with* timeout/cancellation support
3 * This is free and unencumbered software released into the public domain.
4 */
5use std::cell::{RefCell, Ref, RefMut};
6use std::io::{Result, ErrorKind};
7use std::rc::Rc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::Duration;
10
11use mio::{Poll, Events, Token, Waker, Registry};
12
13use lazy_rc::{LazyRc, LazyArc};
14
15use crate::TcpCanceller;
16use crate::utilities::Flag;
17
18const SHUTDOWN: Token = Token(usize::MAX);
19
20thread_local! { 
21    static INSTANCE: LazyRc<TcpManager> = LazyRc::empty();
22}
23
24/// A manager for "shared" resources, used by
25/// [`mtcp_rs::TcpListener`](crate::TcpListener) and
26/// [`mtcp_rs::TcpStream`](crate::TcpStream)
27/// 
28/// The *same* `mtcp_rs::TcpManager` instance can be shared by *multiple*
29/// `mtcp_rs::TcpListener` and `mtcp_rs::TcpStream` instances. However, an
30/// `mtcp_rs::TcpManager` instance can **not** be shared across the thread
31/// boundary: Each thread needs to create its own `mtcp_rs::TcpManager` instance.
32/// A *thread-local* singleton instance can be obtained via the
33/// [`instance()`](TcpManager::instance()) function.
34/// 
35/// The [`canceller()`](TcpManager::canceller()) function can be used to obtain
36/// a new [`mtcp_rs::TcpCanceller`](crate::TcpCanceller) instance for *this*
37/// [`mtcp_rs::TcpManager`](crate::TcpCanceller).
38#[derive(Debug)]
39pub struct TcpManager {
40    context: RefCell<TcpPollContext>,
41    cancelled: LazyArc<Flag>,
42}
43
44#[derive(Debug)]
45pub(crate) struct TcpPollContext {
46    poll: Poll,
47    events: Events,
48    next: AtomicUsize,
49}
50
51impl TcpManager {
52    /// Get the [thread-local](std::thread_local) *singleton* `TcpManager`
53    /// instance for the calling thread. The instance is created lazily for
54    /// each thread.
55    pub fn instance() -> Result<Rc<Self>> {
56        INSTANCE.with(|val| val.or_try_init_with(Self::new))
57    }
58
59    /// Create a new `TcpManager` instance with *default* queue capacity.
60    pub fn new() -> Result<Self> {
61        Self::with_capacity(128)
62    }
63
64    /// Create a new `TcpManager` instance with the specified queue capacity.
65    pub fn with_capacity(capacity: usize) -> Result<Self> {
66        let context = TcpPollContext::new(capacity)?;
67        Ok(Self {
68            context: RefCell::new(context),
69            cancelled: LazyArc::empty(),
70        })
71    }
72
73    /// Create a new [`mtcp_rs::TcpCanceller`](crate::TcpCanceller) instance
74    /// for this `TcpManager`.
75    pub fn canceller(&self) -> Result<TcpCanceller> {
76        self.cancelled
77            .or_try_init_with(|| Ok(Flag::new(Waker::new(self.context().poll.registry(), SHUTDOWN)?)))
78            .map(TcpCanceller::from)
79    }
80
81    /// Check whether this `TcpManager` instance has been
82    /// [cancelled](crate::TcpCanceller::cancel) yet.
83    /// 
84    /// Returns `true`, if cancellation has been requested for this
85    /// `TcpManager`; or `false` otherwise.
86    pub fn cancelled(&self) -> bool {
87        self.cancelled.map(|flag| flag.check()).unwrap_or(false)
88    }
89
90    /// Restart the `TcpManager`, i.e. clear its "cancellation" status.
91    /// 
92    /// Returns `true`, if the `TcpManager` was restarted successfully; or
93    /// `false`, if the `TcpManager` was **not** in a "cancelled" state.
94    pub fn restart(&self) -> Result<bool> {
95        self.cancelled.map(|flag| flag.clear()).unwrap_or(Ok(false))
96    }
97
98    pub(crate) fn context(&self) -> Ref<TcpPollContext> {
99        self.context.borrow()
100    }
101
102    pub(crate) fn context_mut(&self) -> RefMut<TcpPollContext> {
103        self.context.borrow_mut()
104    }
105}
106
107impl TcpPollContext {
108    fn new(capacity: usize) -> Result<Self> {
109        Ok(Self {
110            poll: Poll::new()?,
111            events: Events::with_capacity(capacity),
112            next: AtomicUsize::new(usize::MIN),
113        })
114    }
115
116    pub fn token(&self) -> Token {
117        loop {
118            let token = Token(self.next.fetch_add(1, Ordering::Relaxed));
119            if token != SHUTDOWN {
120                return token;
121            }
122        }
123    }
124
125    pub fn poll(&mut self, timeout: Option<Duration>) -> Result<&Events>{
126        loop {
127            match self.poll.poll(&mut self.events, timeout) {
128                Ok(_) => return Ok(&self.events),
129                Err(error) => {
130                    if error.kind() != ErrorKind::Interrupted {
131                        return Err(error);
132                    }
133                },
134            }
135        }
136    }
137
138    pub fn registry(&self) -> &Registry {
139        self.poll.registry()
140    }
141}