background_jobs_server/server/
pull.rs

1/*
2 * This file is part of Background Jobs.
3 *
4 * Copyright © 2018 Riley Trautman
5 *
6 * Background Jobs is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * Background Jobs is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with Background Jobs.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20use std::{sync::Arc, time::Duration};
21
22use background_jobs_core::{JobInfo, NewJobInfo, Storage};
23use failure::{Error, Fail};
24use futures::{future::poll_fn, Future, Stream};
25#[cfg(feature = "futures-zmq")]
26use futures_zmq::{prelude::*, Multipart, Pull};
27use log::{error, info, trace};
28use serde_derive::Deserialize;
29use tokio::timer::Delay;
30use tokio_threadpool::blocking;
31#[cfg(feature = "tokio-zmq")]
32use tokio_zmq::{prelude::*, Multipart, Pull};
33
34use crate::server::{coerce, Config};
35
36#[derive(Clone, Debug, Deserialize)]
37#[serde(untagged)]
38enum EitherJob {
39    Existing(JobInfo),
40    New(NewJobInfo),
41}
42
43pub(crate) struct PullConfig {
44    server_id: usize,
45    puller: Pull,
46    address: String,
47    storage: Arc<Storage>,
48    config: Arc<Config>,
49}
50
51impl PullConfig {
52    pub(crate) fn init(
53        server_id: usize,
54        address: String,
55        storage: Arc<Storage>,
56        config: Arc<Config>,
57    ) -> impl Future<Item = (), Error = ()> {
58        let cfg = ResetPullConfig {
59            server_id,
60            address,
61            storage,
62            config,
63        };
64
65        cfg.build()
66            .map_err(|e| error!("Error starting puller, {}", e))
67    }
68
69    fn run(self) -> Box<dyn Future<Item = (), Error = ()> + Send> {
70        let config = self.reset();
71        let server_id = self.server_id;
72
73        let storage = self.storage.clone();
74
75        let fut = self
76            .puller
77            .stream()
78            .from_err()
79            .map(|m| {
80                trace!("Handling new message");
81                m
82            })
83            .and_then(parse_job)
84            .and_then(move |job| {
85                trace!("Storing job, {:?}", job);
86                store_job(job, storage.clone(), server_id)
87            })
88            .for_each(|_| Ok(()))
89            .map(|_| info!("Puller is shutting down"))
90            .map_err(|e| {
91                error!("Error storing job, {}", e);
92
93                tokio::spawn(config.rebuild());
94            });
95
96        Box::new(fut)
97    }
98
99    fn reset(&self) -> ResetPullConfig {
100        ResetPullConfig {
101            server_id: self.server_id,
102            address: self.address.clone(),
103            storage: self.storage.clone(),
104            config: self.config.clone(),
105        }
106    }
107}
108
109#[derive(Clone, Debug, Fail)]
110#[fail(display = "Message was empty")]
111pub struct EmptyMessage;
112
113fn parse_job(mut multipart: Multipart) -> Result<EitherJob, Error> {
114    let unparsed_msg = multipart.pop_front().ok_or(EmptyMessage)?;
115
116    let parsed = serde_json::from_slice(&unparsed_msg)?;
117
118    Ok(parsed)
119}
120
121fn store_job(
122    job: EitherJob,
123    storage: Arc<Storage>,
124    server_id: usize,
125) -> impl Future<Item = (), Error = Error> {
126    let storage = storage.clone();
127
128    poll_fn(move || {
129        let job = job.clone();
130        let storage = storage.clone();
131
132        blocking(move || {
133            let job = match job {
134                EitherJob::New(new_job) => storage.assign_id(new_job, server_id)?,
135                EitherJob::Existing(job) => job,
136            };
137
138            if job.is_pending() {
139                info!("Storing pending job, {}", job.id());
140            }
141            if job.is_finished() {
142                info!("Finished job {}", job.id());
143            }
144            if job.is_failed() {
145                info!("Job failed {}", job.id());
146            }
147
148            storage.store_job(job, server_id).map_err(Error::from)
149        })
150        .map_err(Error::from)
151    })
152    .then(coerce)
153}
154
155struct ResetPullConfig {
156    server_id: usize,
157    address: String,
158    storage: Arc<Storage>,
159    config: Arc<Config>,
160}
161
162impl ResetPullConfig {
163    fn rebuild(self) -> impl Future<Item = (), Error = ()> {
164        Delay::new(tokio::clock::now() + Duration::from_secs(5))
165            .from_err()
166            .and_then(move |_| self.build())
167            .map_err(|e| error!("Error restarting puller, {}", e))
168    }
169
170    fn build(self) -> impl Future<Item = (), Error = Error> {
171        Pull::builder(self.config.context.clone())
172            .bind(&self.address)
173            .build()
174            .map(|puller| {
175                let config = PullConfig {
176                    server_id: self.server_id,
177                    puller,
178                    address: self.address,
179                    storage: self.storage,
180                    config: self.config,
181                };
182
183                tokio::spawn(config.run());
184            })
185            .from_err()
186    }
187}