1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
//! This module provides Rust-binding to [prometheus remote storage protocol buffer definitions][proto].
//!
//! Go to [docs.rs][docs] to see those binding types.
//!
//! [proto]: https://github.com/prometheus/prometheus/blob/main/prompb/remote.proto
//! [docs]: https://docs.rs/prom-remote-api/latest/prom_remote_api/types/index.html

use async_trait::async_trait;
use std::{fmt::Display, result::Result as StdResult, sync::Arc};

mod prometheus {
    include!(concat!(env!("OUT_DIR"), "/prometheus.rs"));
}
pub use prometheus::*;

#[derive(Debug)]
pub enum Error {
    SnappyEncode(snap::Error),
    SnappyDecode(snap::Error),
    ProtoDecode(prost::DecodeError),
}

pub type Result<T> = StdResult<T, Error>;

impl Display for Error {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::SnappyEncode(_) => f.write_str("SnappyEncode"),
            Self::SnappyDecode(_) => f.write_str("SnappyDecode"),
            Self::ProtoDecode(_) => f.write_str("ProtoDecode"),
        }
    }
}

impl std::error::Error for Error {
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        match self {
            Self::SnappyEncode(e) => Some(e),
            Self::SnappyDecode(e) => Some(e),
            Self::ProtoDecode(e) => Some(e),
        }
    }
}

/// Remote storage is Prometheus's solution for long-term storage.
///
/// Third-party storage can be integrated with Prometheus by implement this trait.
/// <https://prometheus.io/docs/prometheus/latest/storage/#remote-storage-integrations>
#[async_trait]
pub trait RemoteStorage: Sync {
    /// The type of failures yielded when write and read.
    type Err: Send;

    /// The type of request-scoped values provided for write and read.
    type Context: Send + Sync;

    /// Write samples to remote storage.
    async fn write(&self, ctx: Self::Context, req: WriteRequest) -> StdResult<(), Self::Err>;

    /// Process one query within [ReadRequest](crate::types::ReadRequest).
    ///
    /// Note: Prometheus remote protocol sends multiple queries by default,
    /// use [read](crate::types::RemoteStorage::read) to serve ReadRequest.
    async fn process_query(
        &self,
        ctx: &Self::Context,
        q: Query,
    ) -> StdResult<QueryResult, Self::Err>;

    /// Read samples from remote storage.
    ///
    /// [ReadRequest](crate::types::ReadRequest) may contain more than one sub [queries](crate::types::Query).
    async fn read(
        &self,
        ctx: Self::Context,
        req: ReadRequest,
    ) -> StdResult<ReadResponse, Self::Err> {
        let results = futures::future::join_all(
            req.queries
                .into_iter()
                .map(|q| async { self.process_query(&ctx, q).await }),
        )
        .await
        .into_iter()
        .collect::<StdResult<Vec<_>, Self::Err>>()?;

        Ok(ReadResponse { results })
    }
}

pub type RemoteStorageRef<C, E> = Arc<dyn RemoteStorage<Err = E, Context = C> + Send + Sync>;