apalis_core/
backend.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use std::{any::type_name, future::Future};

use futures::Stream;
use serde::{Deserialize, Serialize};
use tower::Service;

use crate::{
    poller::Poller,
    request::State,
    worker::{Context, Worker},
};

/// A backend represents a task source
/// Both [`Storage`] and [`MessageQueue`] need to implement it for workers to be able to consume tasks
///
/// [`Storage`]: crate::storage::Storage
/// [`MessageQueue`]: crate::mq::MessageQueue
pub trait Backend<Req, Res> {
    /// The stream to be produced by the backend
    type Stream: Stream<Item = Result<Option<Req>, crate::error::Error>>;

    /// Returns the final decoration of layers
    type Layer;

    /// Returns a poller that is ready for streaming
    fn poll<Svc: Service<Req, Response = Res>>(
        self,
        worker: &Worker<Context>,
    ) -> Poller<Self::Stream, Self::Layer>;
}

/// Represents functionality that allows reading of jobs and stats from a backend
/// Some backends esp MessageQueues may not currently implement this
pub trait BackendExpose<T>
where
    Self: Sized,
{
    /// The request type being handled by the backend
    type Request;
    /// The error returned during reading jobs and stats
    type Error;
    /// List all Workers that are working on a backend
    fn list_workers(
        &self,
    ) -> impl Future<Output = Result<Vec<Worker<WorkerState>>, Self::Error>> + Send;

    /// Returns the counts of jobs in different states
    fn stats(&self) -> impl Future<Output = Result<Stat, Self::Error>> + Send;

    /// Fetch jobs persisted in a backend
    fn list_jobs(
        &self,
        status: &State,
        page: i32,
    ) -> impl Future<Output = Result<Vec<Self::Request>, Self::Error>> + Send;
}

/// Represents the current statistics of a backend
#[derive(Debug, Deserialize, Serialize, Default)]
pub struct Stat {
    /// Represents pending tasks
    pub pending: usize,
    /// Represents running tasks
    pub running: usize,
    /// Represents dead tasks
    pub dead: usize,
    /// Represents failed tasks
    pub failed: usize,
    /// Represents successful tasks
    pub success: usize,
}

/// A serializable version of a worker's state.
#[derive(Debug, Serialize, Deserialize)]
pub struct WorkerState {
    /// Type of task being consumed by the worker, useful for display and filtering
    pub r#type: String,
    /// The type of job stream
    pub source: String,
    // TODO: // The layers that were loaded for worker.
    // TODO: // pub layers: Vec<Layer>,
    // TODO: // last_seen: Timestamp,
}
impl WorkerState {
    /// Build a new state
    pub fn new<S>(r#type: String) -> Self {
        Self {
            r#type,
            source: type_name::<S>().to_string(),
        }
    }
}