cogo_http/server/
listener.rs

1use std::sync::{Arc};
2use std::thread;
3
4use crate::net::NetworkListener;
5use crate::runtime;
6
7pub struct ListenerPool<A: NetworkListener> {
8    acceptor: A
9}
10
11impl<A: NetworkListener + Send + 'static> ListenerPool<A> {
12    /// Create a thread pool to manage the acceptor.
13    pub fn new(acceptor: A) -> ListenerPool<A> {
14        ListenerPool { acceptor: acceptor }
15    }
16
17    /// Runs the acceptor pool. Blocks until the acceptors are closed.
18    ///
19    /// ## Panics
20    ///
21    /// Panics if threads == 0.
22    pub fn accept<F>(self, work: F, tasks: usize)
23        where F: Fn(A::Stream) + Send + Sync + 'static {
24        assert!(tasks != 0, "Can't accept on 0 threads.");
25
26        let (super_tx, supervisor_rx) = runtime::chan();
27
28        let work = Arc::new(work);
29
30        // Begin work.
31        for _ in 0..tasks {
32            spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone())
33        }
34
35        // Monitor for panics.
36        // FIXME(reem): This won't ever exit since we still have a super_tx handle.
37        for _ in supervisor_rx.iter() {
38            spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone());
39        }
40    }
41}
42
43fn spawn_with<A, F>(supervisor: runtime::Sender<()>, work: Arc<F>, mut acceptor: A)
44where A: NetworkListener + Send + 'static,
45      F: Fn(<A as NetworkListener>::Stream) + Send + Sync + 'static {
46    runtime::spawn(move || {
47        let _sentinel = Sentinel::new(supervisor, ());
48        loop {
49            match acceptor.accept() {
50                Ok(stream) => work(stream),
51                Err(e) => {
52                    info!("Connection failed: {}", e);
53                }
54            }
55        }
56    });
57}
58
59struct Sentinel<T: Send + 'static> {
60    value: Option<T>,
61    supervisor: runtime::Sender<T>,
62}
63
64impl<T: Send + 'static> Sentinel<T> {
65    fn new(channel: runtime::Sender<T>, data: T) -> Sentinel<T> {
66        Sentinel {
67            value: Some(data),
68            supervisor: channel,
69        }
70    }
71}
72
73impl<T: Send + 'static> Drop for Sentinel<T> {
74    fn drop(&mut self) {
75        // Respawn ourselves
76        let _ = self.supervisor.send(self.value.take().unwrap());
77    }
78}
79