otlp_arrow_library/config/
types.rs1use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::path::PathBuf;
8
9use crate::error::OtlpConfigError;
10
11use secrecy::SecretString;
13
14use url::Url;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize, Serialize)]
19#[serde(rename_all = "snake_case")]
20pub enum ForwardingProtocol {
21 #[default]
23 Protobuf,
24 ArrowFlight,
26}
27
28#[derive(Debug, Clone, Deserialize, Serialize)]
30pub struct ProtocolConfig {
31 #[serde(default = "default_protobuf_enabled")]
33 pub protobuf_enabled: bool,
34
35 #[serde(default = "default_protobuf_port")]
37 pub protobuf_port: u16,
38
39 #[serde(default = "default_arrow_flight_enabled")]
41 pub arrow_flight_enabled: bool,
42
43 #[serde(default = "default_arrow_flight_port")]
45 pub arrow_flight_port: u16,
46
47 #[serde(default = "default_sdk_extraction_enabled")]
54 pub sdk_extraction_enabled: bool,
55}
56
57impl Default for ProtocolConfig {
58 fn default() -> Self {
59 Self {
60 protobuf_enabled: default_protobuf_enabled(),
61 protobuf_port: default_protobuf_port(),
62 arrow_flight_enabled: default_arrow_flight_enabled(),
63 arrow_flight_port: default_arrow_flight_port(),
64 sdk_extraction_enabled: default_sdk_extraction_enabled(),
65 }
66 }
67}
68
69impl ProtocolConfig {
70 pub fn validate(&self) -> Result<(), OtlpConfigError> {
72 if !self.protobuf_enabled && !self.arrow_flight_enabled {
74 return Err(OtlpConfigError::ValidationFailed(
75 "At least one protocol must be enabled".to_string(),
76 ));
77 }
78
79 if self.protobuf_port == 0 {
81 return Err(OtlpConfigError::ValidationFailed(
82 "Protobuf port must be between 1 and 65535".to_string(),
83 ));
84 }
85
86 if self.arrow_flight_port == 0 {
87 return Err(OtlpConfigError::ValidationFailed(
88 "Arrow Flight port must be between 1 and 65535".to_string(),
89 ));
90 }
91
92 if self.protobuf_enabled
94 && self.arrow_flight_enabled
95 && self.protobuf_port == self.arrow_flight_port
96 {
97 return Err(OtlpConfigError::ValidationFailed(
98 "Protobuf and Arrow Flight ports must be different when both protocols are enabled"
99 .to_string(),
100 ));
101 }
102
103 Ok(())
104 }
105}
106
107fn default_protobuf_enabled() -> bool {
108 true
109}
110
111fn default_protobuf_port() -> u16 {
112 4317
113}
114
115fn default_arrow_flight_enabled() -> bool {
116 true
117}
118
119fn default_arrow_flight_port() -> u16 {
120 4318
121}
122
123fn default_sdk_extraction_enabled() -> bool {
124 true
125}
126
127#[derive(Debug, Clone, Deserialize, Serialize)]
154pub struct DashboardConfig {
155 #[serde(default = "default_dashboard_enabled")]
157 pub enabled: bool,
158
159 #[serde(default = "default_dashboard_port")]
161 pub port: u16,
162
163 #[serde(default = "default_dashboard_static_dir")]
165 pub static_dir: PathBuf,
166
167 #[serde(default = "default_dashboard_bind_address")]
170 pub bind_address: String,
171
172 #[serde(default)]
176 pub x_frame_options: Option<String>,
177}
178
179impl Default for DashboardConfig {
180 fn default() -> Self {
181 Self {
182 enabled: default_dashboard_enabled(),
183 port: default_dashboard_port(),
184 static_dir: default_dashboard_static_dir(),
185 bind_address: default_dashboard_bind_address(),
186 x_frame_options: None, }
188 }
189}
190
191impl DashboardConfig {
192 pub fn validate(&self) -> Result<(), OtlpConfigError> {
194 if self.enabled {
195 if self.port == 0 {
197 return Err(OtlpConfigError::ValidationFailed(
198 "Dashboard port must be between 1 and 65535".to_string(),
199 ));
200 }
201
202 if self.port == 4317 || self.port == 4318 {
204 return Err(OtlpConfigError::ValidationFailed(
205 "Dashboard port conflicts with gRPC port (4317 or 4318)".to_string(),
206 ));
207 }
208
209 if !self.bind_address.is_empty() {
211 if self.bind_address.parse::<std::net::IpAddr>().is_err() {
213 return Err(OtlpConfigError::ValidationFailed(format!(
214 "Dashboard bind_address must be a valid IP address: {}",
215 self.bind_address
216 )));
217 }
218 }
219
220 if !self.static_dir.exists() {
222 return Err(OtlpConfigError::InvalidOutputDir(format!(
223 "Dashboard static directory does not exist: {}",
224 self.static_dir.display()
225 )));
226 }
227
228 if !self.static_dir.is_dir() {
229 return Err(OtlpConfigError::InvalidOutputDir(format!(
230 "Dashboard static directory is not a directory: {}",
231 self.static_dir.display()
232 )));
233 }
234
235 if let Some(ref xfo) = self.x_frame_options
237 && xfo != "DENY"
238 && xfo != "SAMEORIGIN"
239 {
240 return Err(OtlpConfigError::ValidationFailed(format!(
241 "x_frame_options must be 'DENY' or 'SAMEORIGIN' (got: {})",
242 xfo
243 )));
244 }
245 }
246
247 Ok(())
248 }
249}
250
251fn default_dashboard_enabled() -> bool {
252 false
253}
254
255fn default_dashboard_port() -> u16 {
256 8080
257}
258
259fn default_dashboard_static_dir() -> PathBuf {
260 PathBuf::from("./dashboard/dist")
261}
262
263fn default_dashboard_bind_address() -> String {
264 "127.0.0.1".to_string()
265}
266
267#[derive(Debug, Clone, Deserialize, Serialize)]
308pub struct Config {
309 #[serde(default = "default_output_dir")]
311 pub output_dir: PathBuf,
312
313 #[serde(default = "default_write_interval_secs")]
315 pub write_interval_secs: u64,
316
317 #[serde(default = "default_trace_cleanup_interval_secs")]
319 pub trace_cleanup_interval_secs: u64,
320
321 #[serde(default = "default_metric_cleanup_interval_secs")]
323 pub metric_cleanup_interval_secs: u64,
324
325 #[serde(default = "default_max_trace_buffer_size")]
327 pub max_trace_buffer_size: usize,
328
329 #[serde(default = "default_max_metric_buffer_size")]
331 pub max_metric_buffer_size: usize,
332
333 #[serde(default)]
335 pub protocols: ProtocolConfig,
336
337 #[serde(default)]
339 pub forwarding: Option<ForwardingConfig>,
340
341 #[serde(default)]
343 pub dashboard: DashboardConfig,
344
345 #[serde(skip)]
354 pub metric_temporality: Option<opentelemetry_sdk::metrics::Temporality>,
355}
356
357impl Default for Config {
358 fn default() -> Self {
359 Self {
360 output_dir: default_output_dir(),
361 write_interval_secs: default_write_interval_secs(),
362 trace_cleanup_interval_secs: default_trace_cleanup_interval_secs(),
363 metric_cleanup_interval_secs: default_metric_cleanup_interval_secs(),
364 max_trace_buffer_size: default_max_trace_buffer_size(),
365 max_metric_buffer_size: default_max_metric_buffer_size(),
366 protocols: ProtocolConfig::default(),
367 forwarding: None,
368 dashboard: DashboardConfig::default(),
369 metric_temporality: None, }
371 }
372}
373
374impl Config {
375 pub fn validate(&self) -> Result<(), OtlpConfigError> {
377 if self.output_dir.to_string_lossy().is_empty() {
379 return Err(OtlpConfigError::InvalidOutputDir(
380 "Output directory cannot be empty".to_string(),
381 ));
382 }
383
384 let path_str = self.output_dir.to_string_lossy();
386 if path_str.len() > 4096 {
387 return Err(OtlpConfigError::InvalidOutputDir(format!(
388 "Output directory path is too long ({} characters, max 4096)",
389 path_str.len()
390 )));
391 }
392
393 if path_str.contains('\0') {
395 return Err(OtlpConfigError::InvalidOutputDir(
396 "Output directory path cannot contain null bytes".to_string(),
397 ));
398 }
399
400 if self.write_interval_secs == 0 {
402 return Err(OtlpConfigError::InvalidInterval(
403 "Write interval must be greater than 0".to_string(),
404 ));
405 }
406
407 if self.write_interval_secs > 3600 {
409 return Err(OtlpConfigError::InvalidInterval(
410 "Write interval must be less than 3600 seconds (1 hour)".to_string(),
411 ));
412 }
413
414 if self.trace_cleanup_interval_secs == 0 {
416 return Err(OtlpConfigError::InvalidInterval(
417 "Trace cleanup interval must be greater than 0".to_string(),
418 ));
419 }
420
421 if self.metric_cleanup_interval_secs == 0 {
422 return Err(OtlpConfigError::InvalidInterval(
423 "Metric cleanup interval must be greater than 0".to_string(),
424 ));
425 }
426
427 if self.max_trace_buffer_size == 0 || self.max_trace_buffer_size > 1_000_000 {
429 return Err(OtlpConfigError::ValidationFailed(format!(
430 "max_trace_buffer_size must be between 1 and 1,000,000 (got {})",
431 self.max_trace_buffer_size
432 )));
433 }
434 if self.max_metric_buffer_size == 0 || self.max_metric_buffer_size > 1_000_000 {
435 return Err(OtlpConfigError::ValidationFailed(format!(
436 "max_metric_buffer_size must be between 1 and 1,000,000 (got {})",
437 self.max_metric_buffer_size
438 )));
439 }
440
441 if self.trace_cleanup_interval_secs > 86400 {
443 return Err(OtlpConfigError::InvalidInterval(
444 "Trace cleanup interval must be less than 86400 seconds (1 day)".to_string(),
445 ));
446 }
447
448 if self.metric_cleanup_interval_secs > 86400 {
449 return Err(OtlpConfigError::InvalidInterval(
450 "Metric cleanup interval must be less than 86400 seconds (1 day)".to_string(),
451 ));
452 }
453
454 if self.trace_cleanup_interval_secs < 60 {
456 return Err(OtlpConfigError::InvalidInterval(
457 "Trace cleanup interval must be at least 60 seconds".to_string(),
458 ));
459 }
460
461 if self.metric_cleanup_interval_secs < 60 {
462 return Err(OtlpConfigError::InvalidInterval(
463 "Metric cleanup interval must be at least 60 seconds".to_string(),
464 ));
465 }
466
467 self.protocols.validate()?;
469
470 if let Some(ref forwarding) = self.forwarding {
472 forwarding.validate()?;
473 }
474
475 self.dashboard.validate()?;
477
478 Ok(())
479 }
480}
481
482#[derive(Debug, Clone, Default, Deserialize, Serialize)]
508pub struct ForwardingConfig {
509 #[serde(default)]
511 pub enabled: bool,
512
513 pub endpoint_url: Option<String>,
515
516 #[serde(default)]
518 pub protocol: ForwardingProtocol,
519
520 #[serde(default)]
522 pub authentication: Option<AuthConfig>,
523}
524
525impl ForwardingConfig {
526 pub fn validate(&self) -> Result<(), OtlpConfigError> {
528 if self.enabled {
529 if let Some(ref url) = self.endpoint_url {
530 if url.is_empty() {
531 return Err(OtlpConfigError::InvalidUrl(
532 "Endpoint URL cannot be empty when forwarding is enabled".to_string(),
533 ));
534 }
535
536 let parsed_url = Url::parse(url).map_err(|e| {
538 OtlpConfigError::InvalidUrl(format!(
539 "Invalid endpoint URL format: {} (error: {})",
540 url, e
541 ))
542 })?;
543
544 match parsed_url.scheme() {
546 "http" | "https" => {}
547 scheme => {
548 return Err(OtlpConfigError::InvalidUrl(format!(
549 "Endpoint URL must use http or https scheme (got: {}): {}",
550 scheme, url
551 )));
552 }
553 }
554
555 if parsed_url.host().is_none() {
557 return Err(OtlpConfigError::InvalidUrl(format!(
558 "Endpoint URL must include a host: {}",
559 url
560 )));
561 }
562 } else {
563 return Err(OtlpConfigError::MissingRequiredField(
564 "endpoint_url is required when forwarding is enabled".to_string(),
565 ));
566 }
567 }
568
569 Ok(())
570 }
571}
572
573#[derive(Debug, Clone, Deserialize, Serialize)]
617pub struct AuthConfig {
618 pub auth_type: String,
620
621 #[serde(skip_serializing, deserialize_with = "deserialize_secret_credentials")]
632 pub credentials: HashMap<String, SecretString>,
633}
634
635fn deserialize_secret_credentials<'de, D>(
637 deserializer: D,
638) -> Result<HashMap<String, SecretString>, D::Error>
639where
640 D: serde::Deserializer<'de>,
641{
642 use serde::Deserialize;
643 let map: HashMap<String, String> = HashMap::deserialize(deserializer)?;
644 Ok(map
645 .into_iter()
646 .map(|(k, v)| (k, SecretString::new(v)))
647 .collect())
648}
649
650impl AuthConfig {
651 pub fn validate(&self) -> Result<(), OtlpConfigError> {
653 if self.auth_type.is_empty() {
654 return Err(OtlpConfigError::ValidationFailed(
655 "Authentication type cannot be empty".to_string(),
656 ));
657 }
658
659 match self.auth_type.as_str() {
661 "api_key" => {
662 if !self.credentials.contains_key("key") {
663 return Err(OtlpConfigError::MissingRequiredField(
664 "key required for api_key authentication".to_string(),
665 ));
666 }
667 }
668 "bearer_token" => {
669 if !self.credentials.contains_key("token") {
670 return Err(OtlpConfigError::MissingRequiredField(
671 "token required for bearer_token authentication".to_string(),
672 ));
673 }
674 }
675 "basic" => {
676 if !self.credentials.contains_key("username")
677 || !self.credentials.contains_key("password")
678 {
679 return Err(OtlpConfigError::MissingRequiredField(
680 "username and password required for basic auth".to_string(),
681 ));
682 }
683 }
684 _ => {
685 return Err(OtlpConfigError::ValidationFailed(format!(
686 "Unsupported authentication type: {}",
687 self.auth_type
688 )));
689 }
690 }
691
692 Ok(())
693 }
694}
695
696#[derive(Debug, Default)]
698pub struct ConfigBuilder {
699 config: Config,
700}
701
702impl ConfigBuilder {
703 pub fn new() -> Self {
705 Self {
706 config: Config::default(),
707 }
708 }
709
710 pub fn output_dir(mut self, dir: impl Into<PathBuf>) -> Self {
712 self.config.output_dir = dir.into();
713 self
714 }
715
716 pub fn write_interval_secs(mut self, secs: u64) -> Self {
718 self.config.write_interval_secs = secs;
719 self
720 }
721
722 pub fn trace_cleanup_interval_secs(mut self, secs: u64) -> Self {
724 self.config.trace_cleanup_interval_secs = secs;
725 self
726 }
727
728 pub fn metric_cleanup_interval_secs(mut self, secs: u64) -> Self {
730 self.config.metric_cleanup_interval_secs = secs;
731 self
732 }
733
734 pub fn max_trace_buffer_size(mut self, size: usize) -> Self {
736 self.config.max_trace_buffer_size = size;
737 self
738 }
739
740 pub fn max_metric_buffer_size(mut self, size: usize) -> Self {
742 self.config.max_metric_buffer_size = size;
743 self
744 }
745
746 pub fn with_temporality(
750 mut self,
751 temporality: opentelemetry_sdk::metrics::Temporality,
752 ) -> Self {
753 self.config.metric_temporality = Some(temporality);
754 self
755 }
756
757 pub fn protocols(mut self, protocols: ProtocolConfig) -> Self {
759 self.config.protocols = protocols;
760 self
761 }
762
763 pub fn protobuf_enabled(mut self, enabled: bool) -> Self {
765 self.config.protocols.protobuf_enabled = enabled;
766 self
767 }
768
769 pub fn protobuf_port(mut self, port: u16) -> Self {
771 self.config.protocols.protobuf_port = port;
772 self
773 }
774
775 pub fn arrow_flight_enabled(mut self, enabled: bool) -> Self {
777 self.config.protocols.arrow_flight_enabled = enabled;
778 self
779 }
780
781 pub fn arrow_flight_port(mut self, port: u16) -> Self {
783 self.config.protocols.arrow_flight_port = port;
784 self
785 }
786
787 pub fn enable_forwarding(mut self, forwarding: ForwardingConfig) -> Self {
789 self.config.forwarding = Some(forwarding);
790 self
791 }
792
793 pub fn forwarding(mut self, forwarding: Option<ForwardingConfig>) -> Self {
795 self.config.forwarding = forwarding;
796 self
797 }
798
799 pub fn dashboard(mut self, dashboard: DashboardConfig) -> Self {
801 self.config.dashboard = dashboard;
802 self
803 }
804
805 pub fn dashboard_enabled(mut self, enabled: bool) -> Self {
807 self.config.dashboard.enabled = enabled;
808 self
809 }
810
811 pub fn dashboard_port(mut self, port: u16) -> Self {
813 self.config.dashboard.port = port;
814 self
815 }
816
817 pub fn dashboard_static_dir(mut self, dir: impl Into<PathBuf>) -> Self {
819 self.config.dashboard.static_dir = dir.into();
820 self
821 }
822
823 pub fn build(self) -> Result<Config, OtlpConfigError> {
825 self.config.validate()?;
826 Ok(self.config)
827 }
828}
829
830fn default_output_dir() -> PathBuf {
832 PathBuf::from("./output_dir")
833}
834
835fn default_write_interval_secs() -> u64 {
836 5
837}
838
839fn default_trace_cleanup_interval_secs() -> u64 {
840 600
841}
842
843fn default_metric_cleanup_interval_secs() -> u64 {
844 3600
845}
846
847fn default_max_trace_buffer_size() -> usize {
848 10000
849}
850
851fn default_max_metric_buffer_size() -> usize {
852 10000
853}