1mod connection;
7
8use self::connection::PoolClientReader;
9use crate::message::{ClientCommand, PoolEvent, PoolReply};
10
11pub use self::connection::{PoolClientWriter, RequestId, Result};
12pub use crate::message::{ErrorReply, Job, JobAssignment};
13
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16
17use log::*;
18
19pub trait MessageHandler {
21 fn job_command(&mut self, job: Job);
22 fn error_reply(&mut self, id: RequestId, error: ErrorReply);
23 fn status_reply(&mut self, id: RequestId, status: String);
24 fn job_reply(&mut self, id: RequestId, job: Box<JobAssignment>);
25}
26
27pub struct PoolClient<H> {
29 writer: Arc<Mutex<PoolClientWriter>>,
30 reader: PoolClientReader,
31 handler: H,
32}
33
34impl<H: MessageHandler> PoolClient<H> {
35 pub fn connect<F>(
37 address: &str,
38 login: &str,
39 pass: &str,
40 keepalive: Option<Duration>,
41 agent: &str,
42 make_handler: F,
43 ) -> Result<Self>
44 where
45 F: FnOnce(Job) -> H,
46 {
47 let (writer, work, reader) = connection::connect(address, login, pass, agent, keepalive)?;
48 debug!("client connected, initial job: {:?}", &work);
49 let writer = Arc::new(Mutex::new(writer));
50 let handler = make_handler(work);
51 Ok(PoolClient {
52 writer,
53 reader,
54 handler,
55 })
56 }
57
58 pub fn write_handle(&self) -> Arc<Mutex<PoolClientWriter>> {
60 Arc::clone(&self.writer)
61 }
62
63 pub fn handler(&self) -> &H {
65 &self.handler
66 }
67
68 pub fn run(mut self) -> Result<()> {
70 loop {
71 let event = if let Some(event) = self.reader.read()? {
72 event
73 } else {
74 debug!("read timeout; sending keepalive");
75 self.writer.lock().unwrap().keepalive().unwrap();
76 continue;
77 };
78 match event {
79 PoolEvent::ClientCommand(ClientCommand::Job(j)) => self.handler.job_command(j),
80 PoolEvent::PoolReply {
81 id,
82 error: Some(error),
83 ..
84 } => self.handler.error_reply(id, error),
85 PoolEvent::PoolReply {
86 id,
87 error: None,
88 result: Some(PoolReply::Status { status }),
89 } => self.handler.status_reply(id, status),
90 PoolEvent::PoolReply {
91 id,
92 error: None,
93 result: Some(PoolReply::Job(job)),
94 } => self.handler.job_reply(id, job),
95 PoolEvent::PoolReply {
96 error: None,
97 result: None,
98 ..
99 } => warn!("pool reply with no content"),
100 }
101 }
102 }
103}