arrow-zerobus-sdk-wrapper 0.6.0

Cross-platform Rust SDK wrapper for Databricks Zerobus with Python bindings
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
//! Configuration types for Zerobus SDK Wrapper
//!
//! This module defines the configuration structures and validation logic.

use crate::error::ZerobusError;
use secrecy::SecretString;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;

/// OpenTelemetry configuration
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct OtlpConfig {
    /// OTLP endpoint URL (optional, uses default if not provided)
    pub endpoint: Option<String>,
    /// Log level filter for tracing (e.g., "info", "debug", "warn", "error")
    /// Controls which log events are exported via tracing
    /// Default: "info"
    #[serde(default = "default_log_level")]
    pub log_level: String,
    /// Additional OTLP configuration options
    #[serde(flatten)]
    pub extra: std::collections::HashMap<String, serde_json::Value>,
}

fn default_log_level() -> String {
    "info".to_string()
}

/// OpenTelemetry SDK configuration
///
/// This configuration structure aligns with the otlp-rust-service SDK requirements.
/// It replaces `OtlpConfig` as a breaking change to simplify configuration and
/// directly map to SDK ConfigBuilder fields.
///
/// # Migration from OtlpConfig
///
/// The old `OtlpConfig` structure had:
/// - `endpoint: Option<String>`
/// - `log_level: String`
/// - `extra: HashMap<String, Value>`
///
/// The new `OtlpSdkConfig` structure has:
/// - `endpoint: Option<String>` - OTLP endpoint URL for remote export
/// - `output_dir: Option<PathBuf>` - Output directory for file-based export
/// - `write_interval_secs: u64` - Write interval in seconds (default: 5)
/// - `log_level: String` - Log level for tracing (default: "info")
///
/// The `extra` field has been removed as it's no longer needed with direct SDK config mapping.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OtlpSdkConfig {
    /// OTLP endpoint URL for remote export (optional)
    pub endpoint: Option<String>,
    /// Output directory for file-based export (optional)
    pub output_dir: Option<PathBuf>,
    /// Write interval in seconds for file-based export (default: 5)
    #[serde(default = "default_write_interval")]
    pub write_interval_secs: u64,
    /// Log level for tracing (default: "info")
    #[serde(default = "default_log_level")]
    pub log_level: String,
}

fn default_write_interval() -> u64 {
    5
}

impl Default for OtlpSdkConfig {
    fn default() -> Self {
        Self {
            endpoint: None,
            output_dir: None,
            write_interval_secs: 5,
            log_level: "info".to_string(),
        }
    }
}

/// Complete configuration for initializing the wrapper
///
/// Represents all configuration needed to initialize a ZerobusWrapper instance,
/// including connection details, observability settings, debug file settings,
/// and retry configuration.
#[derive(Debug, Clone)]
pub struct WrapperConfiguration {
    /// Zerobus endpoint URL (required)
    pub zerobus_endpoint: String,
    /// Unity Catalog URL for authentication (required for SDK)
    pub unity_catalog_url: Option<String>,
    /// OAuth2 client ID (required for SDK)
    /// Stored securely to prevent exposure in memory dumps
    pub client_id: Option<SecretString>,
    /// OAuth2 client secret (required for SDK)
    /// Stored securely to prevent exposure in memory dumps
    pub client_secret: Option<SecretString>,
    /// Target table name in Zerobus (required)
    pub table_name: String,
    /// Enable/disable OpenTelemetry observability (default: false)
    pub observability_enabled: bool,
    /// OpenTelemetry configuration (optional)
    pub observability_config: Option<OtlpSdkConfig>,
    /// Enable/disable debug file output (default: false)
    pub debug_enabled: bool,
    /// Output directory for debug files (required if debug_enabled)
    pub debug_output_dir: Option<PathBuf>,
    /// Debug file flush interval in seconds (default: 5)
    pub debug_flush_interval_secs: u64,
    /// Maximum debug file size in bytes before rotation (optional)
    pub debug_max_file_size: Option<u64>,
    /// Maximum retry attempts for transient failures (default: 5)
    pub retry_max_attempts: u32,
    /// Base delay in milliseconds for exponential backoff (default: 100)
    pub retry_base_delay_ms: u64,
    /// Maximum delay in milliseconds for exponential backoff (default: 30000)
    pub retry_max_delay_ms: u64,
    /// Disable Zerobus SDK transmission while maintaining debug file output (default: false)
    ///
    /// When `true`, the wrapper will skip all Zerobus SDK calls (initialization,
    /// stream creation, data transmission) while still writing debug files (Arrow
    /// and Protobuf) if debug output is enabled.
    ///
    /// # Requirements
    /// - When `true`, `debug_enabled` must also be `true`
    /// - Credentials (`client_id`, `client_secret`) are optional when `true`
    ///
    /// # Use Cases
    /// - Local development without network access
    /// - CI/CD testing without credentials
    /// - Performance testing of conversion logic
    pub zerobus_writer_disabled: bool,
}

impl WrapperConfiguration {
    /// Create a new configuration with defaults
    ///
    /// # Arguments
    ///
    /// * `endpoint` - Zerobus endpoint URL
    /// * `table_name` - Target table name
    ///
    /// # Example
    ///
    /// ```no_run
    /// use arrow_zerobus_sdk_wrapper::WrapperConfiguration;
    ///
    /// let config = WrapperConfiguration::new(
    ///     "https://workspace.cloud.databricks.com".to_string(),
    ///     "my_table".to_string(),
    /// );
    /// ```
    pub fn new(endpoint: String, table_name: String) -> Self {
        Self {
            zerobus_endpoint: endpoint,
            table_name,
            unity_catalog_url: None,
            client_id: None,
            client_secret: None,
            observability_enabled: false,
            observability_config: None,
            debug_enabled: false,
            debug_output_dir: None,
            debug_flush_interval_secs: 5,
            debug_max_file_size: None,
            retry_max_attempts: 5,
            retry_base_delay_ms: 100,
            retry_max_delay_ms: 30000,
            zerobus_writer_disabled: false,
        }
    }

    /// Set OAuth2 credentials
    ///
    /// # Arguments
    ///
    /// * `client_id` - OAuth2 client ID
    /// * `client_secret` - OAuth2 client secret
    ///
    /// Credentials are stored securely using `SecretString` to prevent exposure in memory dumps.
    pub fn with_credentials(mut self, client_id: String, client_secret: String) -> Self {
        self.client_id = Some(SecretString::new(client_id));
        self.client_secret = Some(SecretString::new(client_secret));
        self
    }

    /// Set Unity Catalog URL
    ///
    /// # Arguments
    ///
    /// * `url` - Unity Catalog URL
    pub fn with_unity_catalog(mut self, url: String) -> Self {
        self.unity_catalog_url = Some(url);
        self
    }

    /// Set OpenTelemetry observability configuration
    ///
    /// # Arguments
    ///
    /// * `config` - OpenTelemetry SDK configuration
    pub fn with_observability(mut self, config: OtlpSdkConfig) -> Self {
        self.observability_enabled = true;
        self.observability_config = Some(config);
        self
    }

    /// Set debug output configuration
    ///
    /// # Arguments
    ///
    /// * `output_dir` - Output directory for debug files
    pub fn with_debug_output(mut self, output_dir: PathBuf) -> Self {
        self.debug_enabled = true;
        self.debug_output_dir = Some(output_dir);
        self
    }

    /// Set debug flush interval
    ///
    /// # Arguments
    ///
    /// * `interval_secs` - Flush interval in seconds
    pub fn with_debug_flush_interval_secs(mut self, interval_secs: u64) -> Self {
        self.debug_flush_interval_secs = interval_secs;
        self
    }

    /// Set debug max file size
    ///
    /// # Arguments
    ///
    /// * `max_size` - Maximum file size in bytes before rotation
    pub fn with_debug_max_file_size(mut self, max_size: Option<u64>) -> Self {
        self.debug_max_file_size = max_size;
        self
    }

    /// Set retry configuration
    ///
    /// # Arguments
    ///
    /// * `max_attempts` - Maximum retry attempts
    /// * `base_delay_ms` - Base delay in milliseconds for exponential backoff
    /// * `max_delay_ms` - Maximum delay in milliseconds
    pub fn with_retry_config(
        mut self,
        max_attempts: u32,
        base_delay_ms: u64,
        max_delay_ms: u64,
    ) -> Self {
        self.retry_max_attempts = max_attempts;
        self.retry_base_delay_ms = base_delay_ms;
        self.retry_max_delay_ms = max_delay_ms;
        self
    }

    /// Set writer disabled mode
    ///
    /// # Arguments
    ///
    /// * `disabled` - If `true`, disables Zerobus SDK transmission while maintaining debug output
    ///
    /// # Returns
    ///
    /// Self for method chaining
    ///
    /// # Example
    ///
    /// ```no_run
    /// use arrow_zerobus_sdk_wrapper::WrapperConfiguration;
    /// use std::path::PathBuf;
    ///
    /// let config = WrapperConfiguration::new(
    ///     "https://workspace.cloud.databricks.com".to_string(),
    ///     "my_table".to_string(),
    /// )
    /// .with_debug_output(PathBuf::from("./debug_output"))
    /// .with_zerobus_writer_disabled(true);
    /// ```
    pub fn with_zerobus_writer_disabled(mut self, disabled: bool) -> Self {
        self.zerobus_writer_disabled = disabled;
        self
    }

    /// Validate configuration
    ///
    /// Checks that all required fields are present and valid.
    ///
    /// # Returns
    ///
    /// Returns `Ok(())` if configuration is valid, or `Err(ZerobusError)` if invalid.
    ///
    /// # Errors
    ///
    /// Returns `ConfigurationError` if:
    /// - `zerobus_endpoint` is not a valid URL starting with `https://` or `http://`
    /// - `debug_enabled` is true but `debug_output_dir` is not provided
    /// - `zerobus_writer_disabled` is true but `debug_enabled` is false
    /// - `retry_max_attempts` is 0
    /// - `debug_flush_interval_secs` is 0
    pub fn validate(&self) -> Result<(), ZerobusError> {
        // Validate endpoint URL
        if !self.zerobus_endpoint.starts_with("https://")
            && !self.zerobus_endpoint.starts_with("http://")
        {
            return Err(ZerobusError::ConfigurationError(format!(
                "zerobus_endpoint must start with 'https://' or 'http://', got: '{}'",
                self.zerobus_endpoint
            )));
        }

        // Validate debug configuration
        if self.debug_enabled && self.debug_output_dir.is_none() {
            return Err(ZerobusError::ConfigurationError(
                "debug_output_dir is required when debug_enabled is true".to_string(),
            ));
        }

        // Validate writer disabled mode requires debug enabled
        if self.zerobus_writer_disabled && !self.debug_enabled {
            return Err(ZerobusError::ConfigurationError(
                "debug_enabled must be true when zerobus_writer_disabled is true. Use with_debug_output() to enable debug output.".to_string(),
            ));
        }

        // Validate retry configuration
        if self.retry_max_attempts == 0 {
            return Err(ZerobusError::ConfigurationError(
                "retry_max_attempts must be > 0".to_string(),
            ));
        }

        // Validate debug flush interval
        if self.debug_flush_interval_secs == 0 {
            return Err(ZerobusError::ConfigurationError(
                "debug_flush_interval_secs must be > 0".to_string(),
            ));
        }

        // Validate retry delay configuration
        if self.retry_max_delay_ms < self.retry_base_delay_ms {
            return Err(ZerobusError::ConfigurationError(format!(
                "retry_max_delay_ms ({}) must be >= retry_base_delay_ms ({})",
                self.retry_max_delay_ms, self.retry_base_delay_ms
            )));
        }

        Ok(())
    }
}

impl OtlpSdkConfig {
    /// Validate the SDK configuration
    ///
    /// # Returns
    ///
    /// Returns `Ok(())` if configuration is valid, or `Err(ZerobusError)` if invalid.
    ///
    /// # Errors
    ///
    /// Returns `ConfigurationError` if:
    /// - `endpoint` is provided but not a valid URL
    /// - `output_dir` is provided but not a valid path
    /// - `write_interval_secs` is 0
    /// - `log_level` is not a valid log level
    pub fn validate(&self) -> Result<(), ZerobusError> {
        // Validate endpoint URL if provided
        if let Some(endpoint) = &self.endpoint {
            if !endpoint.starts_with("https://") && !endpoint.starts_with("http://") {
                return Err(ZerobusError::ConfigurationError(format!(
                    "endpoint must start with 'https://' or 'http://', got: '{}'",
                    endpoint
                )));
            }
        }

        // Validate output_dir path if provided
        // Note: PathBuf is always either absolute or relative, so we just check if it's empty
        if let Some(output_dir) = &self.output_dir {
            if output_dir.as_os_str().is_empty() {
                return Err(ZerobusError::ConfigurationError(
                    "output_dir must not be empty".to_string(),
                ));
            }
        }

        // Validate write_interval_secs
        if self.write_interval_secs == 0 {
            return Err(ZerobusError::ConfigurationError(
                "write_interval_secs must be > 0".to_string(),
            ));
        }

        // Validate log_level
        let valid_levels = ["trace", "debug", "info", "warn", "error"];
        if !valid_levels.contains(&self.log_level.to_lowercase().as_str()) {
            return Err(ZerobusError::ConfigurationError(format!(
                "log_level must be one of {:?}, got: '{}'",
                valid_levels, self.log_level
            )));
        }

        Ok(())
    }
}