Skip to main content

network_communicator/
manager.rs

1use tokio_core::reactor::{Core,Remote};
2use std::thread::Builder as ThreadBuilder;
3use std::thread::JoinHandle;
4use std::sync::{Arc,Mutex,RwLock};
5use tokio_curl::Session;
6use std::sync::mpsc::{sync_channel,SyncSender,Receiver};
7use tokio_core::reactor::Handle;
8use futures::Future;
9use futures::future;
10use std::sync::mpsc::{RecvError,SendError};
11use task::{is_terminate_task,generate_terminate_task};
12use std::ops::Drop;
13use std::mem::swap as swap_variables;
14use request_future::RequestFuture;
15use super::Task;
16use super::RequestDownloader;
17use super::RequestDownloaderResult;
18use super::Error;
19use super::Config;
20
21struct Worker {
22	remote: Remote,
23	session: Arc<Mutex<Session>>,
24	thread_handle: JoinHandle<()>,
25	is_terminating: Arc<RwLock<bool>>,
26}
27
28impl Worker {
29	pub fn new() -> Worker {
30		let (tx,rx) = sync_channel::<(Arc<Mutex<Session>>,Remote)>(0);
31		let is_terminating = Arc::new(RwLock::new(false));
32		let is_terminating_thread = is_terminating.clone();
33		let thread_handle = ThreadBuilder::new().spawn(
34			move || {
35				let mut lp = Core::new().expect("Unable to init downloader event-loop");
36				let session = Arc::new(Mutex::new(Session::new(lp.handle())));
37				let remote = lp.remote();
38				tx.send((session,remote)).expect("Unable to send session and remote");
39				loop {
40					{
41						let is_terminating = is_terminating_thread.read().expect("Unable to lock mutex");
42						if *is_terminating {
43							break;
44						}
45					}
46					lp.turn(None);
47				}
48			}
49		).expect(
50			"Unable to init woker thread"
51		);
52		let (session,remote) = rx.recv().expect("Unablet to get session and remote");
53		return Worker {
54			remote: remote,
55			session: session,
56			thread_handle: thread_handle,
57			is_terminating: is_terminating,
58		};
59	}
60
61	fn terminate(self) {
62		{
63			let mut is_terminating = self.is_terminating.write().expect("Unable to lock mutex");
64			*is_terminating = true;
65		}
66		self.remote.spawn(move |_handle:&Handle|{
67			future::ok::<(),()>(())
68		});
69		self.thread_handle.join().expect("Unable to stop thread");
70
71	}
72}
73
74/// Handle for working with network manager.
75#[derive(Debug)]
76pub struct NetworkManagerHandle<T: Send + 'static,E: Send + 'static> {
77	task_rx: SyncSender<Task<T,E>>,
78	result_tx: Receiver<RequestDownloaderResult<T,E>>,
79	manager_handle: Option< JoinHandle<()> >,
80}
81
82impl <T: Send + 'static,E: Send + 'static>NetworkManagerHandle<T,E> {
83	/// Aynchronous sending task to network manager.
84	pub fn send(&self,task: Task<T,E>) -> Result<(), SendError<Task<T,E>>> {
85		return self.task_rx.send(task);
86	}
87
88	/// Returns copy of task sender.
89	pub fn get_sender(&self) -> SyncSender<Task<T,E>> {
90		return self.task_rx.clone();
91	}
92
93	/// Receives result with locking.
94	pub fn recv(&self) -> Result<RequestDownloaderResult<T,E>, RecvError> {
95		return self.result_tx.recv();
96	}
97}
98
99impl <T: Send + 'static,E: Send + 'static>Drop for NetworkManagerHandle<T,E> {
100	/// When dropping we are waiting for termination of all threads.
101	fn drop(&mut self) {
102		self.task_rx.send(
103			generate_terminate_task()
104		).expect(
105			"Unable to send termination task"
106		);
107		let mut manager_handle: Option<JoinHandle<()>> = None;
108		swap_variables(&mut manager_handle,&mut self.manager_handle);
109		manager_handle.unwrap().join().expect(
110			"Unable to wait download manager thread"
111		);
112	}
113}
114
115/// Manager for processsing request.
116pub struct NetworkManager<T: Send + 'static,E: Send + 'static> {
117	remotes: Vec<Worker>,
118	result_tx: SyncSender<RequestDownloaderResult<T,E>>,
119}
120
121impl <T: Send + 'static,E: Send + 'static>NetworkManager<T,E> {
122
123	fn terminate_workers(&mut self) {
124		for worker in self.remotes.drain(..) {
125			worker.terminate();
126		}
127	}
128
129	/// Creates new network manager.
130	/// Produces threads that may panic when something is going wrong.
131	pub fn start(config: &Config) -> Result<NetworkManagerHandle<T,E>,Error<E>> {
132		let mut remotes = vec![];
133		let (result_tx,result_rx) = sync_channel::<RequestDownloaderResult<T,E>>(config.get_limit_result_channel());
134		for _ in 0..config.get_thread_count() {
135			remotes.push(Worker::new());
136		}
137		let mut manager = NetworkManager {
138			remotes: remotes,
139			result_tx: result_tx.clone(),
140		};
141		let (tx,rx) = sync_channel::<Task<T,E>>(config.get_limit_task_channel());
142		let thread_handle = ThreadBuilder::new().spawn(
143			move || {
144				for worker in manager.remotes.iter().cycle() {
145					let task = rx.recv().expect("Unable to get task");
146					if is_terminate_task(&task) {
147						break;
148					}
149					let manager_result_tx = manager.result_tx.clone();
150					let worker_session = worker.session.clone();
151					worker.remote.spawn(move |_handle:&Handle|{
152						let request_result = RequestDownloader::new(task,&*worker_session.lock().unwrap(),manager_result_tx.clone());
153						let result = match request_result {
154							Ok(request) => {
155								RequestFuture::Process(request)
156							},
157							Err(request_error) => {
158								manager_result_tx.send(
159									Err( request_error )
160								).expect("Unable to send result");
161								RequestFuture::Ready
162							},
163						};
164						return result.map(|_|{()}).map_err(|_|{()});
165					});
166				}
167				manager.terminate_workers();
168			}
169		);
170		match thread_handle {
171			Ok(thread_handle) => {
172				return Ok(NetworkManagerHandle {
173					task_rx: tx,
174					result_tx: result_rx,
175					manager_handle: Some(thread_handle),
176				});
177			},
178			Err(thread_error) => {
179				return Err(Error::ThreadStartError { error: thread_error });
180			}
181		}
182	}
183}