evidentsource_client/evident_source.rs
1//! EvidentSource - the main entrypoint for connecting to an EvidentSource server.
2
3use std::time::Duration;
4
5use chrono::{DateTime, Utc};
6use futures::stream::{self, StreamExt};
7use futures::Stream;
8use http::Uri;
9use tonic::transport::{Channel, ClientTlsConfig};
10
11use evidentsource_core::domain::{DatabaseError, DatabaseName};
12use evidentsource_core::{DatabaseCatalog, DatabaseIdentity};
13
14use crate::auth::Credentials;
15use crate::connection::Connection;
16use crate::conversions::timestamp_to_datetime;
17use crate::EvidentSourceClient;
18
19/// A simple database identity returned from create_database.
20#[derive(Debug, Clone)]
21pub struct DatabaseIdentityImpl {
22 name: DatabaseName,
23 created_at: DateTime<Utc>,
24}
25
26impl DatabaseIdentity for DatabaseIdentityImpl {
27 fn name(&self) -> &DatabaseName {
28 &self.name
29 }
30
31 fn created_at(&self) -> DateTime<Utc> {
32 self.created_at
33 }
34}
35
36/// The main entrypoint for connecting to an EvidentSource server.
37///
38/// `EvidentSource` manages the gRPC connection and provides methods for:
39/// - Listing available databases (`DatabaseCatalog` trait)
40/// - Creating and deleting databases
41/// - Connecting to a specific database for operations
42///
43/// # Example
44///
45/// ```ignore
46/// use evidentsource_client::EvidentSource;
47/// use evidentsource_core::domain::DatabaseName;
48///
49/// // Connect to the server
50/// let es = EvidentSource::connect_to_server("http://localhost:50051").await?;
51///
52/// // List all databases
53/// let mut databases = es.list_databases();
54/// while let Some(name) = databases.next().await {
55/// println!("Database: {}", name);
56/// }
57///
58/// // Connect to a specific database
59/// let db_name = DatabaseName::new("my-db")?;
60/// let conn = es.connect(&db_name).await?;
61///
62/// // Use the connection for operations
63/// let latest = conn.latest_database().await?;
64/// println!("Latest revision: {}", latest.revision());
65/// ```
66#[derive(Clone)]
67pub struct EvidentSource {
68 client: EvidentSourceClient,
69}
70
71impl EvidentSource {
72 /// Create a builder for configuring and connecting to an EvidentSource server.
73 ///
74 /// # Example
75 ///
76 /// ```ignore
77 /// use evidentsource_client::EvidentSource;
78 /// use std::time::Duration;
79 ///
80 /// let es = EvidentSource::builder("http://localhost:50051")
81 /// .connect_timeout(Duration::from_secs(10))
82 /// .connect()
83 /// .await?;
84 /// ```
85 pub fn builder(addr: &str) -> EvidentSourceBuilder {
86 EvidentSourceBuilder::new(addr)
87 }
88
89 /// Connect to an EvidentSource server without authentication.
90 ///
91 /// This only works if the server has `allow_anonymous=true`.
92 ///
93 /// # Arguments
94 ///
95 /// * `addr` - The server address (e.g., `http://localhost:50051` or `https://api.example.com`)
96 ///
97 /// # Example
98 ///
99 /// ```ignore
100 /// let es = EvidentSource::connect_to_server("http://localhost:50051").await?;
101 /// ```
102 pub async fn connect_to_server(addr: &str) -> Result<Self, crate::Error> {
103 Self::connect_with_auth(addr, Credentials::None).await
104 }
105
106 /// Connect to an EvidentSource server with authentication credentials.
107 ///
108 /// # Arguments
109 ///
110 /// * `addr` - The server address (e.g., `http://localhost:50051` or `https://api.example.com`)
111 /// * `credentials` - Authentication credentials (BearerToken, DevMode, or None)
112 ///
113 /// # Examples
114 ///
115 /// ```ignore
116 /// use evidentsource_client::{EvidentSource, Credentials, DevModeCredentials};
117 ///
118 /// // With bearer token (production - requires TLS)
119 /// let es = EvidentSource::connect_with_auth(
120 /// "https://api.example.com:50051",
121 /// Credentials::BearerToken(my_jwt_token),
122 /// ).await?;
123 ///
124 /// // With DevMode credentials (local development)
125 /// let es = EvidentSource::connect_with_auth(
126 /// "http://localhost:50051",
127 /// Credentials::DevMode(
128 /// DevModeCredentials::new("dev-user@example.com")
129 /// .with_email("dev@example.com")
130 /// .with_display_name("Developer")
131 /// ),
132 /// ).await?;
133 /// ```
134 pub async fn connect_with_auth(
135 addr: &str,
136 credentials: Credentials,
137 ) -> Result<Self, crate::Error> {
138 let client = EvidentSourceClient::with_credentials(addr, credentials).await?;
139 Ok(Self { client })
140 }
141
142 /// Create an EvidentSource instance from an existing client.
143 pub fn from_client(client: EvidentSourceClient) -> Self {
144 Self { client }
145 }
146
147 /// Connect to a specific database.
148 ///
149 /// This returns a `Connection` that maintains a live subscription to
150 /// database updates and implements `DatabaseProvider` and `DatabaseConnection`.
151 ///
152 /// # Arguments
153 ///
154 /// * `database` - The name of the database to connect to
155 ///
156 /// # Example
157 ///
158 /// ```ignore
159 /// let db_name = DatabaseName::new("my-db")?;
160 /// let conn = es.connect(&db_name).await?;
161 /// ```
162 pub async fn connect(&self, database: &DatabaseName) -> Result<Connection, DatabaseError> {
163 Connection::new(self.client.clone(), database.clone()).await
164 }
165
166 /// Get a reference to the underlying gRPC client.
167 ///
168 /// This provides access to low-level operations not exposed through
169 /// the high-level API.
170 pub fn client(&self) -> &EvidentSourceClient {
171 &self.client
172 }
173
174 /// Get a mutable reference to the underlying gRPC client.
175 pub fn client_mut(&mut self) -> &mut EvidentSourceClient {
176 &mut self.client
177 }
178}
179
180impl std::fmt::Debug for EvidentSource {
181 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182 f.debug_struct("EvidentSource").finish()
183 }
184}
185
186impl DatabaseCatalog for EvidentSource {
187 type Identity = DatabaseIdentityImpl;
188
189 fn list_databases(&self) -> impl Stream<Item = DatabaseName> {
190 let mut client = self.client.clone();
191
192 stream::once(async move {
193 let result = client.fetch_catalog().await;
194
195 match result {
196 Ok(response_stream) => response_stream
197 .filter_map(|result| async move {
198 match result {
199 Ok(reply) => DatabaseName::new(&reply.database_name).ok(),
200 Err(_) => None,
201 }
202 })
203 .boxed(),
204 Err(_) => stream::empty().boxed(),
205 }
206 })
207 .flatten()
208 }
209
210 fn create_database(
211 &self,
212 name: DatabaseName,
213 ) -> impl std::future::Future<Output = Result<Self::Identity, DatabaseError>> {
214 let mut client = self.client.clone();
215
216 async move {
217 let proto_db = client
218 .create_database(name.to_string())
219 .await
220 .map_err(|e| match e {
221 crate::Error::GrpcStatus(ref status) => {
222 crate::status_mapping::to_database_error(status, &name.to_string())
223 }
224 _ => DatabaseError::ServerError(e.to_string()),
225 })?;
226
227 let db_name = DatabaseName::new(&proto_db.name)?;
228 let created_at = proto_db
229 .created_at
230 .ok_or_else(|| {
231 DatabaseError::ServerError("missing created_at timestamp".to_string())
232 })
233 .and_then(|ts| {
234 timestamp_to_datetime(ts).map_err(|e| {
235 DatabaseError::ServerError(format!("invalid timestamp: {}", e))
236 })
237 })?;
238
239 Ok(DatabaseIdentityImpl {
240 name: db_name,
241 created_at,
242 })
243 }
244 }
245
246 fn delete_database(
247 &self,
248 name: DatabaseName,
249 ) -> impl std::future::Future<Output = Result<(), DatabaseError>> {
250 let mut client = self.client.clone();
251
252 async move {
253 client
254 .delete_database(name.to_string())
255 .await
256 .map_err(|e| match e {
257 crate::Error::GrpcStatus(ref status) => {
258 crate::status_mapping::to_database_error(status, &name.to_string())
259 }
260 _ => DatabaseError::ServerError(e.to_string()),
261 })?;
262
263 Ok(())
264 }
265 }
266}
267
268/// TLS configuration for the connection.
269#[derive(Debug, Clone)]
270pub enum TlsConfig {
271 /// Use the system's native TLS roots.
272 Native,
273 /// Disable TLS verification (not recommended for production).
274 Disabled,
275}
276
277/// Configuration for exponential backoff retry behavior.
278#[derive(Debug, Clone)]
279pub struct BackoffConfig {
280 /// Initial delay before first retry.
281 pub initial: Duration,
282 /// Maximum delay between retries.
283 pub max: Duration,
284 /// Multiplier applied to delay after each retry.
285 pub multiplier: f64,
286}
287
288impl Default for BackoffConfig {
289 fn default() -> Self {
290 Self {
291 initial: Duration::from_millis(100),
292 max: Duration::from_secs(30),
293 multiplier: 2.0,
294 }
295 }
296}
297
298/// Builder for configuring and connecting to an EvidentSource server.
299///
300/// # Example
301///
302/// ```ignore
303/// use evidentsource_client::{EvidentSource, Credentials, DevModeCredentials};
304/// use std::time::Duration;
305///
306/// let es = EvidentSource::builder("http://localhost:50051")
307/// .credentials(Credentials::DevMode(
308/// DevModeCredentials::new("dev-user@example.com")
309/// ))
310/// .connect_timeout(Duration::from_secs(10))
311/// .tls(TlsConfig::Native)
312/// .connect()
313/// .await?;
314/// ```
315#[derive(Debug, Clone)]
316pub struct EvidentSourceBuilder {
317 addr: String,
318 credentials: Credentials,
319 tls_config: Option<TlsConfig>,
320 connect_timeout: Option<Duration>,
321 backoff: Option<BackoffConfig>,
322}
323
324impl EvidentSourceBuilder {
325 /// Create a new builder with the given server address.
326 pub fn new(addr: &str) -> Self {
327 Self {
328 addr: addr.to_string(),
329 credentials: Credentials::None,
330 tls_config: None,
331 connect_timeout: None,
332 backoff: None,
333 }
334 }
335
336 /// Set the authentication credentials.
337 ///
338 /// # Example
339 ///
340 /// ```ignore
341 /// let es = EvidentSource::builder("http://localhost:50051")
342 /// .credentials(Credentials::DevMode(DevModeCredentials::new("dev-user")))
343 /// .connect()
344 /// .await?;
345 /// ```
346 pub fn credentials(mut self, credentials: Credentials) -> Self {
347 self.credentials = credentials;
348 self
349 }
350
351 /// Set the TLS configuration.
352 ///
353 /// For HTTPS addresses, TLS is enabled by default with native roots.
354 pub fn tls(mut self, config: TlsConfig) -> Self {
355 self.tls_config = Some(config);
356 self
357 }
358
359 /// Set the connection timeout.
360 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
361 self.connect_timeout = Some(timeout);
362 self
363 }
364
365 /// Set the backoff configuration for retries.
366 pub fn backoff(mut self, config: BackoffConfig) -> Self {
367 self.backoff = Some(config);
368 self
369 }
370
371 /// Connect to the server with the configured options.
372 pub async fn connect(self) -> Result<EvidentSource, crate::Error> {
373 let uri = Uri::try_from(&self.addr)?;
374 let mut channel_builder = Channel::builder(uri.clone());
375
376 // Apply connect timeout
377 if let Some(timeout) = self.connect_timeout {
378 channel_builder = channel_builder.connect_timeout(timeout);
379 }
380
381 // Apply TLS configuration
382 let is_https = uri.scheme_str() == Some("https");
383 match self.tls_config {
384 Some(TlsConfig::Native) | None if is_https => {
385 let tls_config = ClientTlsConfig::new()
386 .with_native_roots()
387 .domain_name(uri.host().unwrap_or("localhost"));
388 channel_builder = channel_builder.tls_config(tls_config)?;
389 }
390 Some(TlsConfig::Disabled) => {
391 // TLS disabled, don't configure
392 }
393 _ => {
394 // HTTP connection, no TLS needed
395 }
396 }
397
398 let channel = channel_builder.connect().await?;
399 let interceptor = crate::auth::AuthInterceptor::new(self.credentials);
400 let client =
401 crate::com::evidentsource::evident_source_client::EvidentSourceClient::with_interceptor(
402 channel,
403 interceptor,
404 );
405
406 Ok(EvidentSource {
407 client: EvidentSourceClient { client },
408 })
409 }
410
411 /// Get the configured backoff settings.
412 ///
413 /// This can be used when creating connections to propagate retry settings.
414 pub fn get_backoff(&self) -> Option<&BackoffConfig> {
415 self.backoff.as_ref()
416 }
417}