jsonrpc/
output_agent.rs

1// Copyright 2016 Bruno Medeiros
2//
3// Licensed under the Apache License, Version 2.0 
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0>. 
5// This file may not be copied, modified, or distributed
6// except according to those terms.
7
8use std;
9
10use std::thread;
11use std::sync::mpsc;
12use std::sync::mpsc::Receiver;
13use std::sync::mpsc::SendError;
14
15#[allow(unused_imports)]
16use util::core::*;
17
18use service_util::MessageWriter;
19
20
21/* -----------------  ----------------- */
22
23/// Functional interface representing the execution of the Agent
24pub trait AgentRunnable {
25    
26    /// Run the Agent. Must end with a call to `agent_inner.enter_agent_loop()`
27    fn run_agent(self, agent_inner: AgentInnerRunner);
28    
29}
30
31impl<FN : FnOnce(AgentInnerRunner)> AgentRunnable for FN {
32    fn run_agent(self, agent_lr: AgentInnerRunner) {
33        self(agent_lr)
34    }
35}
36
37/* ----------------- Output_Agent ----------------- */
38
39pub type OutputAgentTask = Box<Fn(&mut MessageWriter) + Send>;
40
41pub enum OutputAgentMessage {
42    Shutdown,
43    Task(OutputAgentTask),
44}
45
46const ERR_SEND_TASK_FAILED : &'static str =
47    "Failed to send task, Agent receive channel is closed.";
48
49/**
50
51Actor-like, dedicated worker thread that handles writing to an output stream.
52Accepts tasks as messages, which are executed by the agent.
53
54Note that the OutputAgent type is not meant to be Sync, it is meant to be synchronized externally,
55or more typically, used by one controlling thread only. 
56
57 */
58pub struct OutputAgent {
59    is_shutdown : bool,
60    output_thread : Option<thread::JoinHandle<()>>,
61    task_queue : mpsc::Sender<OutputAgentMessage>,
62}
63
64impl OutputAgent {
65    
66    pub fn start_with_provider<OUT, OUT_P>(msg_writer_provider: OUT_P) 
67        -> OutputAgent
68    where 
69        OUT : MessageWriter + 'static, 
70        OUT_P : FnOnce() -> OUT + Send + 'static 
71    {
72        Self::start(move |inner_runner: AgentInnerRunner| {
73            let mut msg_writer: OUT = msg_writer_provider();
74            
75            inner_runner.enter_agent_loop(&mut move |task: OutputAgentTask| {
76                task(&mut msg_writer); 
77            });
78        })
79    }
80    
81    
82    pub fn start<AGENT_RUNNER>(agent_runner: AGENT_RUNNER) 
83        -> OutputAgent
84    where 
85        AGENT_RUNNER : AgentRunnable,
86        AGENT_RUNNER : Send + 'static,
87    {
88        let (tx, rx) = mpsc::channel::<OutputAgentMessage>();
89        
90        let output_thread = thread::spawn(move || {
91            agent_runner.run_agent(AgentInnerRunner{ rx : rx });
92        });
93        
94        OutputAgent { is_shutdown : false, task_queue : tx,  output_thread : Some(output_thread) }     
95    }
96    
97    pub fn is_shutdown(&self) -> bool {
98        self.is_shutdown
99    }
100    
101    pub fn try_submit_task(& self, task : OutputAgentTask) -> Result<(), SendError<OutputAgentMessage>> {
102        self.task_queue.send(OutputAgentMessage::Task(task))
103    }
104    
105    pub fn submit_task(& self, task : OutputAgentTask) {
106        assert!(!self.is_shutdown);
107        self.try_submit_task(task).expect(ERR_SEND_TASK_FAILED);
108    }
109    
110    pub fn request_shutdown(&mut self) {
111        if !self.is_shutdown {
112            self.is_shutdown = true;
113            // send shutdown message
114            self.task_queue.send(OutputAgentMessage::Shutdown).ok();
115        }
116    }
117    
118    pub fn shutdown_and_soft_join(&mut self) -> thread::Result<()> {
119        self.request_shutdown();
120        
121        let output_thread = std::mem::replace(&mut self.output_thread, None);
122        
123        if let Some(output_thread) = output_thread {
124            output_thread.join()
125        } else {
126            Ok(())
127        }
128    }
129    
130    pub fn shutdown_and_join(&mut self) {
131        if let Err(err) = self.shutdown_and_soft_join() {
132            // re-panic
133            panic!(err);
134        }
135    }
136    
137}
138
139impl Drop for OutputAgent {
140    
141    fn drop(&mut self) {
142        if !thread::panicking() {
143            // User must have taken care of shutdown itself, otherwise thread is leaked.
144            assert!(self.is_shutdown());
145        }
146    }
147    
148}
149
150pub struct AgentInnerRunner {
151    rx: Receiver<OutputAgentMessage>,
152}
153impl AgentInnerRunner {
154    
155    /// Enter agent loop, with given task runner
156    pub fn enter_agent_loop<TASK_RUNNER : ?Sized>(self, task_runner: &mut TASK_RUNNER)
157    where
158         TASK_RUNNER : FnMut(OutputAgentTask) 
159    {
160        let mut rx = self.rx;
161        Self::run_agent_loop(&mut rx, task_runner);
162    }
163    
164    pub fn run_agent_loop<TASK_RUNNER : ?Sized>(rx: &mut Receiver<OutputAgentMessage>, task_runner: &mut TASK_RUNNER)
165    where
166         TASK_RUNNER : FnMut(OutputAgentTask) 
167    {
168        loop {
169            let task_message = rx.recv();
170            if let Err(err) = task_message {
171                // BM: Should we really panic if agent has not shutdown explicitly?
172                panic!("Error, task queue channel closed without explicit agent shutdown: {:?}", err);
173            }
174            let task_message = task_message.unwrap();
175            
176            match task_message {
177                OutputAgentMessage::Shutdown => { 
178                    return; 
179                }
180                OutputAgentMessage::Task(task) => {
181                    task_runner(task);
182                }
183            }
184        }
185    }
186}
187
188
189/* -----------------  ----------------- */
190
191#[test]
192fn test_OutputAgent() {
193    
194    use util::tests::*;
195    use service_util::WriteLineMessageWriter;
196   
197    let output = vec![];
198    let mut agent = OutputAgent::start_with_provider(move || WriteLineMessageWriter(output));
199    
200    agent.submit_task(new(|msg_writer| {
201        msg_writer.write_message("First responde.").unwrap();
202    }));
203    
204    agent.shutdown_and_join();
205    // Test re-entrance
206    agent.shutdown_and_join();
207    
208    
209    let output = newArcMutex(vec![] as Vec<u8>);
210    let output2 = output.clone();
211    
212    let mut agent = OutputAgent::start(move |inner_runner: AgentInnerRunner| {
213        inner_runner.enter_agent_loop(&mut move |task: OutputAgentTask| {
214            let mut lock : std::sync::MutexGuard<Vec<u8>> = output2.lock().unwrap();
215            task(&mut WriteLineMessageWriter(&mut *lock));
216        });
217    });
218    
219    agent.submit_task(new(|msg_writer| {
220        msg_writer.write_message("First response.").unwrap();
221    }));
222    
223    agent.shutdown_and_join();
224    
225    assert_equal(String::from_utf8(unwrap_ArcMutex(output)).unwrap(), "First response.\n".to_string());
226}
227
228// The following code we don't want to run, we just want to test that it compiles
229#[cfg(test)]
230pub fn test_OutputAgent_API() {
231    use std::sync::Arc;
232    use std::net::TcpStream;
233    use std::sync::Mutex;
234    use std::io::Read;
235    use std::io;
236    use service_util::WriteLineMessageWriter;
237    
238    // Test with Vec<u8>
239    let mut agent = OutputAgent::start_with_provider(|| WriteLineMessageWriter(Vec::<u8>::new()));
240    agent.shutdown_and_join();
241    
242    // Test with StdOut
243    let mut agent = OutputAgent::start_with_provider(|| WriteLineMessageWriter(std::io::stdout()));
244    agent.shutdown_and_join();
245    
246    
247    // Test with StdoutLock - lock entire agent loop
248    let mut agent = OutputAgent::start(|inner_runner: AgentInnerRunner| {
249        let stdout = io::stdout();
250        let mut stdoutlock = stdout.lock();
251        
252        inner_runner.enter_agent_loop(&mut |task: OutputAgentTask| {
253            task(&mut WriteLineMessageWriter(&mut stdoutlock));
254        });
255    });
256    agent.shutdown_and_join();
257    
258    
259    // Test with tcp stream
260    let stream = Arc::new(Mutex::new(TcpStream::connect("127.0.0.1:34254").unwrap()));
261    let stream2 = stream.clone();
262    let mut agent = OutputAgent::start(move |inner_runner: AgentInnerRunner| {
263        inner_runner.enter_agent_loop(&mut |task: OutputAgentTask| {
264            let mut stream = stream2.lock().expect("Re-entered mutex lock");
265            task(&mut WriteLineMessageWriter(&mut *stream));
266        });
267    });
268    agent.shutdown_and_join();
269    
270    {
271        let mut stream = stream.lock().expect("Re-entered mutex lock");
272        stream.read_to_string(&mut String::new()).expect("failed to read stream");
273    }
274}