background_jobs_server/server/
pull.rs1use std::{sync::Arc, time::Duration};
21
22use background_jobs_core::{JobInfo, NewJobInfo, Storage};
23use failure::{Error, Fail};
24use futures::{future::poll_fn, Future, Stream};
25#[cfg(feature = "futures-zmq")]
26use futures_zmq::{prelude::*, Multipart, Pull};
27use log::{error, info, trace};
28use serde_derive::Deserialize;
29use tokio::timer::Delay;
30use tokio_threadpool::blocking;
31#[cfg(feature = "tokio-zmq")]
32use tokio_zmq::{prelude::*, Multipart, Pull};
33
34use crate::server::{coerce, Config};
35
36#[derive(Clone, Debug, Deserialize)]
37#[serde(untagged)]
38enum EitherJob {
39 Existing(JobInfo),
40 New(NewJobInfo),
41}
42
43pub(crate) struct PullConfig {
44 server_id: usize,
45 puller: Pull,
46 address: String,
47 storage: Arc<Storage>,
48 config: Arc<Config>,
49}
50
51impl PullConfig {
52 pub(crate) fn init(
53 server_id: usize,
54 address: String,
55 storage: Arc<Storage>,
56 config: Arc<Config>,
57 ) -> impl Future<Item = (), Error = ()> {
58 let cfg = ResetPullConfig {
59 server_id,
60 address,
61 storage,
62 config,
63 };
64
65 cfg.build()
66 .map_err(|e| error!("Error starting puller, {}", e))
67 }
68
69 fn run(self) -> Box<dyn Future<Item = (), Error = ()> + Send> {
70 let config = self.reset();
71 let server_id = self.server_id;
72
73 let storage = self.storage.clone();
74
75 let fut = self
76 .puller
77 .stream()
78 .from_err()
79 .map(|m| {
80 trace!("Handling new message");
81 m
82 })
83 .and_then(parse_job)
84 .and_then(move |job| {
85 trace!("Storing job, {:?}", job);
86 store_job(job, storage.clone(), server_id)
87 })
88 .for_each(|_| Ok(()))
89 .map(|_| info!("Puller is shutting down"))
90 .map_err(|e| {
91 error!("Error storing job, {}", e);
92
93 tokio::spawn(config.rebuild());
94 });
95
96 Box::new(fut)
97 }
98
99 fn reset(&self) -> ResetPullConfig {
100 ResetPullConfig {
101 server_id: self.server_id,
102 address: self.address.clone(),
103 storage: self.storage.clone(),
104 config: self.config.clone(),
105 }
106 }
107}
108
109#[derive(Clone, Debug, Fail)]
110#[fail(display = "Message was empty")]
111pub struct EmptyMessage;
112
113fn parse_job(mut multipart: Multipart) -> Result<EitherJob, Error> {
114 let unparsed_msg = multipart.pop_front().ok_or(EmptyMessage)?;
115
116 let parsed = serde_json::from_slice(&unparsed_msg)?;
117
118 Ok(parsed)
119}
120
121fn store_job(
122 job: EitherJob,
123 storage: Arc<Storage>,
124 server_id: usize,
125) -> impl Future<Item = (), Error = Error> {
126 let storage = storage.clone();
127
128 poll_fn(move || {
129 let job = job.clone();
130 let storage = storage.clone();
131
132 blocking(move || {
133 let job = match job {
134 EitherJob::New(new_job) => storage.assign_id(new_job, server_id)?,
135 EitherJob::Existing(job) => job,
136 };
137
138 if job.is_pending() {
139 info!("Storing pending job, {}", job.id());
140 }
141 if job.is_finished() {
142 info!("Finished job {}", job.id());
143 }
144 if job.is_failed() {
145 info!("Job failed {}", job.id());
146 }
147
148 storage.store_job(job, server_id).map_err(Error::from)
149 })
150 .map_err(Error::from)
151 })
152 .then(coerce)
153}
154
155struct ResetPullConfig {
156 server_id: usize,
157 address: String,
158 storage: Arc<Storage>,
159 config: Arc<Config>,
160}
161
162impl ResetPullConfig {
163 fn rebuild(self) -> impl Future<Item = (), Error = ()> {
164 Delay::new(tokio::clock::now() + Duration::from_secs(5))
165 .from_err()
166 .and_then(move |_| self.build())
167 .map_err(|e| error!("Error restarting puller, {}", e))
168 }
169
170 fn build(self) -> impl Future<Item = (), Error = Error> {
171 Pull::builder(self.config.context.clone())
172 .bind(&self.address)
173 .build()
174 .map(|puller| {
175 let config = PullConfig {
176 server_id: self.server_id,
177 puller,
178 address: self.address,
179 storage: self.storage,
180 config: self.config,
181 };
182
183 tokio::spawn(config.run());
184 })
185 .from_err()
186 }
187}