cogo_http/server/
listener.rs1use std::sync::{Arc};
2use std::thread;
3
4use crate::net::NetworkListener;
5use crate::runtime;
6
7pub struct ListenerPool<A: NetworkListener> {
8 acceptor: A
9}
10
11impl<A: NetworkListener + Send + 'static> ListenerPool<A> {
12 pub fn new(acceptor: A) -> ListenerPool<A> {
14 ListenerPool { acceptor: acceptor }
15 }
16
17 pub fn accept<F>(self, work: F, tasks: usize)
23 where F: Fn(A::Stream) + Send + Sync + 'static {
24 assert!(tasks != 0, "Can't accept on 0 threads.");
25
26 let (super_tx, supervisor_rx) = runtime::chan();
27
28 let work = Arc::new(work);
29
30 for _ in 0..tasks {
32 spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone())
33 }
34
35 for _ in supervisor_rx.iter() {
38 spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone());
39 }
40 }
41}
42
43fn spawn_with<A, F>(supervisor: runtime::Sender<()>, work: Arc<F>, mut acceptor: A)
44where A: NetworkListener + Send + 'static,
45 F: Fn(<A as NetworkListener>::Stream) + Send + Sync + 'static {
46 runtime::spawn(move || {
47 let _sentinel = Sentinel::new(supervisor, ());
48 loop {
49 match acceptor.accept() {
50 Ok(stream) => work(stream),
51 Err(e) => {
52 info!("Connection failed: {}", e);
53 }
54 }
55 }
56 });
57}
58
59struct Sentinel<T: Send + 'static> {
60 value: Option<T>,
61 supervisor: runtime::Sender<T>,
62}
63
64impl<T: Send + 'static> Sentinel<T> {
65 fn new(channel: runtime::Sender<T>, data: T) -> Sentinel<T> {
66 Sentinel {
67 value: Some(data),
68 supervisor: channel,
69 }
70 }
71}
72
73impl<T: Send + 'static> Drop for Sentinel<T> {
74 fn drop(&mut self) {
75 let _ = self.supervisor.send(self.value.take().unwrap());
77 }
78}
79