eventsourced_nats/
lib.rs

1//! [EventLog](eventsourced::event_log::EventLog) and
2//! [SnapshotStore](eventsourced::snapshot_store::SnapshotStore) implementations based upon [NATS](https://nats.io/).
3
4mod event_log;
5mod snapshot_store;
6
7pub use event_log::{Config as NatsEventLogConfig, NatsEventLog};
8pub use snapshot_store::{Config as NatsSnapshotStoreConfig, NatsSnapshotStore};
9
10use async_nats::{Client, ConnectOptions};
11use error_ext::BoxError;
12use prost::{DecodeError, EncodeError};
13use secrecy::{ExposeSecret, SecretString};
14use serde::Deserialize;
15use std::path::PathBuf;
16use thiserror::Error;
17
18/// Authentication configuration.
19#[derive(Debug, Clone, Deserialize)]
20#[serde(untagged)]
21pub enum AuthConfig {
22    UserPassword {
23        user: String,
24        password: SecretString,
25    },
26    CredentialsFile(PathBuf),
27}
28
29/// Errors from the [NatsEventLog] or [NatsSnapshotStore].
30#[derive(Debug, Error)]
31pub enum Error {
32    #[error("NATS error: {0}")]
33    Nats(String, #[source] Box<dyn std::error::Error + Send + Sync>),
34
35    /// Event cannot be converted into bytes.
36    #[error("cannot convert event to bytes")]
37    ToBytes(#[source] BoxError),
38
39    /// Bytes cannot be converted to event.
40    #[error("cannot convert bytes to event")]
41    FromBytes(#[source] BoxError),
42
43    /// Snapshot cannot be encoded as Protocol Buffers.
44    #[error("cannot encode snapshot as Protocol Buffers")]
45    EncodeSnapshot(#[from] EncodeError),
46
47    /// Snapshot cannot be decoded from Protocol Buffers.
48    #[error("cannot decode snapshot from Protocol Buffers")]
49    DecodeSnapshot(#[from] DecodeError),
50
51    /// Invalid sequence number.
52    #[error("invalid sequence number")]
53    InvalidNonZeroU64,
54}
55
56/// Create a NATS client.
57pub async fn make_client(auth: Option<&AuthConfig>, server_addr: &str) -> Result<Client, Error> {
58    let mut options = ConnectOptions::new();
59    if let Some(auth) = auth {
60        match auth {
61            AuthConfig::UserPassword { user, password } => {
62                options =
63                    options.user_and_password(user.to_owned(), password.expose_secret().to_owned());
64            }
65
66            AuthConfig::CredentialsFile(credentials) => {
67                options = options
68                    .credentials_file(credentials)
69                    .await
70                    .map_err(|error| {
71                        Error::Nats(
72                            format!(
73                                "cannot read NATS credentials file at {})",
74                                credentials.display()
75                            ),
76                            error.into(),
77                        )
78                    })?;
79            }
80        }
81    }
82    let client = options.connect(server_addr).await.map_err(|error| {
83        Error::Nats(
84            format!("cannot connect to NATS server at {server_addr})"),
85            error.into(),
86        )
87    })?;
88    Ok(client)
89}
90
91#[cfg(test)]
92pub mod tests {
93    use crate::AuthConfig;
94    use assert_matches::assert_matches;
95    use config::{Config, File, FileFormat};
96    use secrecy::ExposeSecret;
97
98    pub const NATS_VERSION: &str = "2.10-alpine";
99
100    #[test]
101    fn test_deserialize_auth_config() {
102        let auth = "user: test\npassword: test";
103        let config = Config::builder()
104            .add_source(File::from_str(&auth, FileFormat::Yaml))
105            .build()
106            .unwrap();
107        let result = config.try_deserialize::<AuthConfig>();
108        assert_matches!(
109            result,
110            Ok(AuthConfig::UserPassword { user, password })
111                if user =="test" && password.expose_secret() == "test"
112        );
113    }
114}