network_communicator/
manager.rs1use tokio_core::reactor::{Core,Remote};
2use std::thread::Builder as ThreadBuilder;
3use std::thread::JoinHandle;
4use std::sync::{Arc,Mutex,RwLock};
5use tokio_curl::Session;
6use std::sync::mpsc::{sync_channel,SyncSender,Receiver};
7use tokio_core::reactor::Handle;
8use futures::Future;
9use futures::future;
10use std::sync::mpsc::{RecvError,SendError};
11use task::{is_terminate_task,generate_terminate_task};
12use std::ops::Drop;
13use std::mem::swap as swap_variables;
14use request_future::RequestFuture;
15use super::Task;
16use super::RequestDownloader;
17use super::RequestDownloaderResult;
18use super::Error;
19use super::Config;
20
21struct Worker {
22 remote: Remote,
23 session: Arc<Mutex<Session>>,
24 thread_handle: JoinHandle<()>,
25 is_terminating: Arc<RwLock<bool>>,
26}
27
28impl Worker {
29 pub fn new() -> Worker {
30 let (tx,rx) = sync_channel::<(Arc<Mutex<Session>>,Remote)>(0);
31 let is_terminating = Arc::new(RwLock::new(false));
32 let is_terminating_thread = is_terminating.clone();
33 let thread_handle = ThreadBuilder::new().spawn(
34 move || {
35 let mut lp = Core::new().expect("Unable to init downloader event-loop");
36 let session = Arc::new(Mutex::new(Session::new(lp.handle())));
37 let remote = lp.remote();
38 tx.send((session,remote)).expect("Unable to send session and remote");
39 loop {
40 {
41 let is_terminating = is_terminating_thread.read().expect("Unable to lock mutex");
42 if *is_terminating {
43 break;
44 }
45 }
46 lp.turn(None);
47 }
48 }
49 ).expect(
50 "Unable to init woker thread"
51 );
52 let (session,remote) = rx.recv().expect("Unablet to get session and remote");
53 return Worker {
54 remote: remote,
55 session: session,
56 thread_handle: thread_handle,
57 is_terminating: is_terminating,
58 };
59 }
60
61 fn terminate(self) {
62 {
63 let mut is_terminating = self.is_terminating.write().expect("Unable to lock mutex");
64 *is_terminating = true;
65 }
66 self.remote.spawn(move |_handle:&Handle|{
67 future::ok::<(),()>(())
68 });
69 self.thread_handle.join().expect("Unable to stop thread");
70
71 }
72}
73
74#[derive(Debug)]
76pub struct NetworkManagerHandle<T: Send + 'static,E: Send + 'static> {
77 task_rx: SyncSender<Task<T,E>>,
78 result_tx: Receiver<RequestDownloaderResult<T,E>>,
79 manager_handle: Option< JoinHandle<()> >,
80}
81
82impl <T: Send + 'static,E: Send + 'static>NetworkManagerHandle<T,E> {
83 pub fn send(&self,task: Task<T,E>) -> Result<(), SendError<Task<T,E>>> {
85 return self.task_rx.send(task);
86 }
87
88 pub fn get_sender(&self) -> SyncSender<Task<T,E>> {
90 return self.task_rx.clone();
91 }
92
93 pub fn recv(&self) -> Result<RequestDownloaderResult<T,E>, RecvError> {
95 return self.result_tx.recv();
96 }
97}
98
99impl <T: Send + 'static,E: Send + 'static>Drop for NetworkManagerHandle<T,E> {
100 fn drop(&mut self) {
102 self.task_rx.send(
103 generate_terminate_task()
104 ).expect(
105 "Unable to send termination task"
106 );
107 let mut manager_handle: Option<JoinHandle<()>> = None;
108 swap_variables(&mut manager_handle,&mut self.manager_handle);
109 manager_handle.unwrap().join().expect(
110 "Unable to wait download manager thread"
111 );
112 }
113}
114
115pub struct NetworkManager<T: Send + 'static,E: Send + 'static> {
117 remotes: Vec<Worker>,
118 result_tx: SyncSender<RequestDownloaderResult<T,E>>,
119}
120
121impl <T: Send + 'static,E: Send + 'static>NetworkManager<T,E> {
122
123 fn terminate_workers(&mut self) {
124 for worker in self.remotes.drain(..) {
125 worker.terminate();
126 }
127 }
128
129 pub fn start(config: &Config) -> Result<NetworkManagerHandle<T,E>,Error<E>> {
132 let mut remotes = vec![];
133 let (result_tx,result_rx) = sync_channel::<RequestDownloaderResult<T,E>>(config.get_limit_result_channel());
134 for _ in 0..config.get_thread_count() {
135 remotes.push(Worker::new());
136 }
137 let mut manager = NetworkManager {
138 remotes: remotes,
139 result_tx: result_tx.clone(),
140 };
141 let (tx,rx) = sync_channel::<Task<T,E>>(config.get_limit_task_channel());
142 let thread_handle = ThreadBuilder::new().spawn(
143 move || {
144 for worker in manager.remotes.iter().cycle() {
145 let task = rx.recv().expect("Unable to get task");
146 if is_terminate_task(&task) {
147 break;
148 }
149 let manager_result_tx = manager.result_tx.clone();
150 let worker_session = worker.session.clone();
151 worker.remote.spawn(move |_handle:&Handle|{
152 let request_result = RequestDownloader::new(task,&*worker_session.lock().unwrap(),manager_result_tx.clone());
153 let result = match request_result {
154 Ok(request) => {
155 RequestFuture::Process(request)
156 },
157 Err(request_error) => {
158 manager_result_tx.send(
159 Err( request_error )
160 ).expect("Unable to send result");
161 RequestFuture::Ready
162 },
163 };
164 return result.map(|_|{()}).map_err(|_|{()});
165 });
166 }
167 manager.terminate_workers();
168 }
169 );
170 match thread_handle {
171 Ok(thread_handle) => {
172 return Ok(NetworkManagerHandle {
173 task_rx: tx,
174 result_tx: result_rx,
175 manager_handle: Some(thread_handle),
176 });
177 },
178 Err(thread_error) => {
179 return Err(Error::ThreadStartError { error: thread_error });
180 }
181 }
182 }
183}