streambed/
lib.rs

1#![doc = include_str!("../README.md")]
2
3use std::io::{self, BufRead, BufReader, Read};
4use std::sync::Arc;
5use std::time::Duration;
6
7use crypto::SALT_SIZE;
8use log::{debug, info, warn};
9use rand::RngCore;
10#[cfg(feature = "reqwest")]
11use reqwest::Certificate;
12use serde::{Deserialize, Serialize};
13use tokio::sync::Notify;
14use tokio::task::JoinHandle;
15use tokio::time;
16
17pub mod commit_log;
18pub mod crypto;
19pub mod delayer;
20pub mod secret_store;
21
22/// Read a line from an async reader.
23pub fn read_line<R: Read>(reader: R) -> Result<String, io::Error> {
24    let mut line = String::new();
25    let mut reader = BufReader::new(reader);
26    reader.read_line(&mut line)?;
27    let len = line.trim_end_matches(&['\r', '\n'][..]).len();
28    line.truncate(len);
29    Ok(line)
30}
31
32/// Read a pem file and return its corresponding Reqwest certificate.
33#[cfg(feature = "reqwest")]
34pub async fn read_pem<R: Read>(mut reader: R) -> Result<Certificate, Box<dyn std::error::Error>> {
35    let mut buf = vec![];
36    reader.read_to_end(&mut buf)?;
37    Certificate::from_pem(&buf).map_err(|e| e.into())
38}
39
40/// A handle to the task created by `authenticate_secret_store` that
41/// can be used to subsequently cancel it.
42pub struct AuthenticationTask {
43    join_handle: Option<JoinHandle<()>>,
44    termination: Arc<Notify>,
45}
46
47/// Performs an initial authentication with the secret store and also spawns a
48/// task to re-authenticate on token expiry. A timeout is provided to cause the
49/// re-authentication to sleep between non-successful authentication attempts.
50pub async fn reauthenticate_secret_store(
51    ss: impl secret_store::SecretStore + 'static,
52    role_id: &str,
53    secret_id: &str,
54    unauthenticated_timeout: Duration,
55    max_lease_duration: Duration,
56) -> AuthenticationTask {
57    let mut approle_auth_reply = ss.approle_auth(role_id, secret_id).await;
58    let auth_role_id = role_id.to_string();
59    let auth_secret_id = secret_id.to_string();
60    let auth_unauthenticated_timeout = unauthenticated_timeout;
61    let termination = Arc::new(Notify::new());
62    let task_termination = termination.clone();
63    let join_handle = tokio::spawn(async move {
64        let mut never_reported_info = true;
65        let mut never_reported_warn = true;
66        loop {
67            match &approle_auth_reply {
68                Ok(approle_auth) => {
69                    if never_reported_info {
70                        info!("Initially authenticated with the secret store");
71                        never_reported_info = false;
72                    }
73                    tokio::select! {
74                        _ = task_termination.notified() => break,
75                        _ = time::sleep(Duration::from_secs(approle_auth.auth.lease_duration).min(max_lease_duration)) => {
76                            approle_auth_reply = ss.approle_auth(&auth_role_id, &auth_secret_id).await;
77                        }
78                    }
79                }
80                Err(e) => {
81                    if never_reported_warn {
82                        warn!(
83                            "Unable to initially authenticate with the secret store. Error: {:?}",
84                            e
85                        );
86                        never_reported_warn = false;
87                    }
88                    tokio::select! {
89                        _ = task_termination.notified() => break,
90                        _ = time::sleep(auth_unauthenticated_timeout) => (),
91                    }
92                }
93            }
94        }
95    });
96    AuthenticationTask {
97        join_handle: Some(join_handle),
98        termination,
99    }
100}
101
102impl AuthenticationTask {
103    /// Cancels a previous authentication task and waits for it to
104    /// finish. The method may be called multiple times, although
105    /// it is effective only on the first call.
106    pub async fn cancel(&mut self) {
107        if let Some(join_handle) = self.join_handle.take() {
108            debug!("Cancelling the original secret store authentication");
109            self.termination.notify_one();
110            let _ = join_handle.await;
111        }
112    }
113}
114
115/// Given a secret store, a path to a secret, get a secret.
116/// The secret is expected to reside in a data field named "value".
117pub async fn get_secret_value(
118    ss: &impl secret_store::SecretStore,
119    secret_path: &str,
120) -> Option<String> {
121    let result = ss.get_secret(secret_path).await;
122    if let Ok(Some(secret_reply)) = result {
123        secret_reply.data.data.get("value").cloned()
124    } else {
125        None
126    }
127}
128
129/// Given a secret store, a path to a secret, and a byte buffer to be decrypted,
130/// decrypt it in place. Returns a decoded structure if decryption
131/// was successful.
132/// The secret is expected to reside in a data field named "value" and
133/// is encoded as a hex string of 32 characters (16 bytes)
134/// The buffer is expected to contain both the salt and the bytes to be decrypted.
135pub async fn decrypt_buf<'a, T, D, DE>(
136    ss: &impl secret_store::SecretStore,
137    secret_path: &str,
138    buf: &'a mut [u8],
139    deserialize: D,
140) -> Option<T>
141where
142    T: Deserialize<'a>,
143    D: FnOnce(&'a [u8]) -> Result<T, DE>,
144{
145    get_secret_value(ss, secret_path)
146        .await
147        .and_then(|secret_value| decrypt_buf_with_secret(secret_value, buf, deserialize))
148}
149
150/// Given a secret, and a byte buffer to be decrypted,
151/// decrypt it in place. Returns a decoded structure if decryption
152/// was successful.
153/// The buffer is expected to contain both the salt and the bytes to be decrypted.
154pub fn decrypt_buf_with_secret<'a, T, D, DE>(
155    secret_value: String,
156    buf: &'a mut [u8],
157    deserialize: D,
158) -> Option<T>
159where
160    T: Deserialize<'a>,
161    D: FnOnce(&'a [u8]) -> Result<T, DE>,
162{
163    if buf.len() >= crypto::SALT_SIZE {
164        if let Ok(s) = hex::decode(secret_value) {
165            let (salt, bytes) = buf.split_at_mut(crypto::SALT_SIZE);
166            crypto::decrypt(bytes, &s.try_into().ok()?, &salt.try_into().ok()?);
167            return deserialize(bytes).ok();
168        }
169    }
170    None
171}
172
173/// Given a secret store, a path to a secret, and a type to be encrypted,
174/// serialize and then encrypt it.
175/// Returns an encrypted buffer prefixed with a random salt if successful.
176/// The secret is expected to reside in a data field named "value" and
177/// is encoded as a hex string of 32 characters (16 bytes)
178/// is encoded as a hex string. Any non alpha-numeric characters are
179/// also filtered out.
180pub async fn encrypt_struct<T, U, F, S, SE>(
181    ss: &impl secret_store::SecretStore,
182    secret_path: &str,
183    serialize: S,
184    rng: F,
185    t: &T,
186) -> Option<Vec<u8>>
187where
188    T: Serialize,
189    S: FnOnce(&T) -> Result<Vec<u8>, SE>,
190    F: FnOnce() -> U,
191    U: RngCore,
192{
193    get_secret_value(ss, secret_path)
194        .await
195        .and_then(|secret_value| encrypt_struct_with_secret(secret_value, serialize, rng, t))
196}
197
198/// Given secret, and a type to be encrypted,
199/// serialize and then encrypt it.
200/// Returns an encrypted buffer prefixed with a random salt if successful.
201pub fn encrypt_struct_with_secret<T, U, F, S, SE>(
202    secret_value: String,
203    serialize: S,
204    rng: F,
205    t: &T,
206) -> Option<Vec<u8>>
207where
208    T: Serialize,
209    S: FnOnce(&T) -> Result<Vec<u8>, SE>,
210    F: FnOnce() -> U,
211    U: RngCore,
212{
213    if let Ok(s) = hex::decode(secret_value) {
214        if let Ok(mut bytes) = serialize(t) {
215            let salt = crypto::salt(&mut (rng)());
216            crypto::encrypt(&mut bytes, &s.try_into().ok()?, &salt);
217            let mut buf = Vec::with_capacity(SALT_SIZE + bytes.len());
218            buf.extend(salt);
219            buf.extend(bytes);
220            return Some(buf);
221        }
222    }
223    None
224}