1use std;
9
10use std::thread;
11use std::sync::mpsc;
12use std::sync::mpsc::Receiver;
13use std::sync::mpsc::SendError;
14
15#[allow(unused_imports)]
16use util::core::*;
17
18use service_util::MessageWriter;
19
20
21pub trait AgentRunnable {
25
26 fn run_agent(self, agent_inner: AgentInnerRunner);
28
29}
30
31impl<FN : FnOnce(AgentInnerRunner)> AgentRunnable for FN {
32 fn run_agent(self, agent_lr: AgentInnerRunner) {
33 self(agent_lr)
34 }
35}
36
37pub type OutputAgentTask = Box<Fn(&mut MessageWriter) + Send>;
40
41pub enum OutputAgentMessage {
42 Shutdown,
43 Task(OutputAgentTask),
44}
45
46const ERR_SEND_TASK_FAILED : &'static str =
47 "Failed to send task, Agent receive channel is closed.";
48
49pub struct OutputAgent {
59 is_shutdown : bool,
60 output_thread : Option<thread::JoinHandle<()>>,
61 task_queue : mpsc::Sender<OutputAgentMessage>,
62}
63
64impl OutputAgent {
65
66 pub fn start_with_provider<OUT, OUT_P>(msg_writer_provider: OUT_P)
67 -> OutputAgent
68 where
69 OUT : MessageWriter + 'static,
70 OUT_P : FnOnce() -> OUT + Send + 'static
71 {
72 Self::start(move |inner_runner: AgentInnerRunner| {
73 let mut msg_writer: OUT = msg_writer_provider();
74
75 inner_runner.enter_agent_loop(&mut move |task: OutputAgentTask| {
76 task(&mut msg_writer);
77 });
78 })
79 }
80
81
82 pub fn start<AGENT_RUNNER>(agent_runner: AGENT_RUNNER)
83 -> OutputAgent
84 where
85 AGENT_RUNNER : AgentRunnable,
86 AGENT_RUNNER : Send + 'static,
87 {
88 let (tx, rx) = mpsc::channel::<OutputAgentMessage>();
89
90 let output_thread = thread::spawn(move || {
91 agent_runner.run_agent(AgentInnerRunner{ rx : rx });
92 });
93
94 OutputAgent { is_shutdown : false, task_queue : tx, output_thread : Some(output_thread) }
95 }
96
97 pub fn is_shutdown(&self) -> bool {
98 self.is_shutdown
99 }
100
101 pub fn try_submit_task(& self, task : OutputAgentTask) -> Result<(), SendError<OutputAgentMessage>> {
102 self.task_queue.send(OutputAgentMessage::Task(task))
103 }
104
105 pub fn submit_task(& self, task : OutputAgentTask) {
106 assert!(!self.is_shutdown);
107 self.try_submit_task(task).expect(ERR_SEND_TASK_FAILED);
108 }
109
110 pub fn request_shutdown(&mut self) {
111 if !self.is_shutdown {
112 self.is_shutdown = true;
113 self.task_queue.send(OutputAgentMessage::Shutdown).ok();
115 }
116 }
117
118 pub fn shutdown_and_soft_join(&mut self) -> thread::Result<()> {
119 self.request_shutdown();
120
121 let output_thread = std::mem::replace(&mut self.output_thread, None);
122
123 if let Some(output_thread) = output_thread {
124 output_thread.join()
125 } else {
126 Ok(())
127 }
128 }
129
130 pub fn shutdown_and_join(&mut self) {
131 if let Err(err) = self.shutdown_and_soft_join() {
132 panic!(err);
134 }
135 }
136
137}
138
139impl Drop for OutputAgent {
140
141 fn drop(&mut self) {
142 if !thread::panicking() {
143 assert!(self.is_shutdown());
145 }
146 }
147
148}
149
150pub struct AgentInnerRunner {
151 rx: Receiver<OutputAgentMessage>,
152}
153impl AgentInnerRunner {
154
155 pub fn enter_agent_loop<TASK_RUNNER : ?Sized>(self, task_runner: &mut TASK_RUNNER)
157 where
158 TASK_RUNNER : FnMut(OutputAgentTask)
159 {
160 let mut rx = self.rx;
161 Self::run_agent_loop(&mut rx, task_runner);
162 }
163
164 pub fn run_agent_loop<TASK_RUNNER : ?Sized>(rx: &mut Receiver<OutputAgentMessage>, task_runner: &mut TASK_RUNNER)
165 where
166 TASK_RUNNER : FnMut(OutputAgentTask)
167 {
168 loop {
169 let task_message = rx.recv();
170 if let Err(err) = task_message {
171 panic!("Error, task queue channel closed without explicit agent shutdown: {:?}", err);
173 }
174 let task_message = task_message.unwrap();
175
176 match task_message {
177 OutputAgentMessage::Shutdown => {
178 return;
179 }
180 OutputAgentMessage::Task(task) => {
181 task_runner(task);
182 }
183 }
184 }
185 }
186}
187
188
189#[test]
192fn test_OutputAgent() {
193
194 use util::tests::*;
195 use service_util::WriteLineMessageWriter;
196
197 let output = vec![];
198 let mut agent = OutputAgent::start_with_provider(move || WriteLineMessageWriter(output));
199
200 agent.submit_task(new(|msg_writer| {
201 msg_writer.write_message("First responde.").unwrap();
202 }));
203
204 agent.shutdown_and_join();
205 agent.shutdown_and_join();
207
208
209 let output = newArcMutex(vec![] as Vec<u8>);
210 let output2 = output.clone();
211
212 let mut agent = OutputAgent::start(move |inner_runner: AgentInnerRunner| {
213 inner_runner.enter_agent_loop(&mut move |task: OutputAgentTask| {
214 let mut lock : std::sync::MutexGuard<Vec<u8>> = output2.lock().unwrap();
215 task(&mut WriteLineMessageWriter(&mut *lock));
216 });
217 });
218
219 agent.submit_task(new(|msg_writer| {
220 msg_writer.write_message("First response.").unwrap();
221 }));
222
223 agent.shutdown_and_join();
224
225 assert_equal(String::from_utf8(unwrap_ArcMutex(output)).unwrap(), "First response.\n".to_string());
226}
227
228#[cfg(test)]
230pub fn test_OutputAgent_API() {
231 use std::sync::Arc;
232 use std::net::TcpStream;
233 use std::sync::Mutex;
234 use std::io::Read;
235 use std::io;
236 use service_util::WriteLineMessageWriter;
237
238 let mut agent = OutputAgent::start_with_provider(|| WriteLineMessageWriter(Vec::<u8>::new()));
240 agent.shutdown_and_join();
241
242 let mut agent = OutputAgent::start_with_provider(|| WriteLineMessageWriter(std::io::stdout()));
244 agent.shutdown_and_join();
245
246
247 let mut agent = OutputAgent::start(|inner_runner: AgentInnerRunner| {
249 let stdout = io::stdout();
250 let mut stdoutlock = stdout.lock();
251
252 inner_runner.enter_agent_loop(&mut |task: OutputAgentTask| {
253 task(&mut WriteLineMessageWriter(&mut stdoutlock));
254 });
255 });
256 agent.shutdown_and_join();
257
258
259 let stream = Arc::new(Mutex::new(TcpStream::connect("127.0.0.1:34254").unwrap()));
261 let stream2 = stream.clone();
262 let mut agent = OutputAgent::start(move |inner_runner: AgentInnerRunner| {
263 inner_runner.enter_agent_loop(&mut |task: OutputAgentTask| {
264 let mut stream = stream2.lock().expect("Re-entered mutex lock");
265 task(&mut WriteLineMessageWriter(&mut *stream));
266 });
267 });
268 agent.shutdown_and_join();
269
270 {
271 let mut stream = stream.lock().expect("Re-entered mutex lock");
272 stream.read_to_string(&mut String::new()).expect("failed to read stream");
273 }
274}