1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use crate::ScouterServerConfig;
use scouter_types::StorageType;
use serde::Serialize;
use std::path::PathBuf;
#[derive(Debug, Clone, Serialize)]
pub struct ObjectStorageSettings {
pub storage_uri: String,
pub storage_type: StorageType,
pub region: String, // this is aws specific
/// How often the Delta Lake compaction (Z-ORDER optimize) runs for trace tables. Default: 24h.
pub trace_compaction_interval_hours: u64,
/// How often the span buffer flushes to Delta Lake. Default: 5s.
pub trace_flush_interval_secs: u64,
/// How often each pod refreshes the Delta table snapshot from shared storage. Default: 10s.
///
/// In multi-pod deployments (e.g. K8s with a dedicated writer pod and reader pods),
/// the reader pods' in-memory Delta log snapshot becomes stale after the writer commits
/// new data. This ticker calls `update_incremental()` on the DeltaTable and re-registers
/// the SessionContext so queries return fresh results without a pod restart.
pub trace_refresh_interval_secs: u64,
}
impl Default for ObjectStorageSettings {
fn default() -> Self {
let storage_uri = std::env::var("SCOUTER_STORAGE_URI")
.unwrap_or_else(|_| "./scouter_storage".to_string());
let storage_type = ScouterServerConfig::get_storage_type(&storage_uri);
// need to set this for aws objectstore
let region = std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string());
let trace_compaction_interval_hours =
std::env::var("SCOUTER_TRACE_COMPACTION_INTERVAL_HOURS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(24u64);
let trace_flush_interval_secs = std::env::var("SCOUTER_TRACE_FLUSH_INTERVAL_SECS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(5u64);
let trace_refresh_interval_secs = std::env::var("SCOUTER_TRACE_REFRESH_INTERVAL_SECS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(10u64);
Self {
storage_uri,
storage_type,
region,
trace_compaction_interval_hours,
trace_flush_interval_secs,
trace_refresh_interval_secs,
}
}
}
impl ObjectStorageSettings {
/// Buffer size for trace span batching before flushing to Delta Lake.
///
/// Configurable via `SCOUTER_TRACE_BUFFER_SIZE`. Larger values produce fewer,
/// bigger Parquet files — reducing Delta log replay cost and file-open overhead
/// on cloud storage at the expense of slightly longer flush intervals.
/// Default: 10,000 spans.
pub fn trace_buffer_size(&self) -> usize {
std::env::var("SCOUTER_TRACE_BUFFER_SIZE")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(10_000)
}
/// Maximum size (in MB) of the object store range cache used to avoid
/// redundant cloud round-trips for immutable Parquet footer reads.
///
/// Configurable via `SCOUTER_OBJECT_CACHE_MB`. Default: 64 MB.
pub fn object_cache_mb(&self) -> u64 {
std::env::var("SCOUTER_OBJECT_CACHE_MB")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(64)
}
pub fn storage_root(&self) -> String {
match self.storage_type {
StorageType::Google | StorageType::Aws | StorageType::Azure => {
if let Some(stripped) = self.storage_uri.strip_prefix("gs://") {
stripped.split('/').next().unwrap_or(stripped).to_string()
} else if let Some(stripped) = self.storage_uri.strip_prefix("s3://") {
stripped.split('/').next().unwrap_or(stripped).to_string()
} else if let Some(stripped) = self.storage_uri.strip_prefix("az://") {
stripped.split('/').next().unwrap_or(stripped).to_string()
} else {
self.storage_uri.clone()
}
}
StorageType::Local => {
// For local storage, just return the path directly
self.storage_uri.clone()
}
}
}
pub fn canonicalized_path(&self) -> String {
// if registry is local canonicalize the path
if self.storage_type == StorageType::Local {
let path = PathBuf::from(&self.storage_uri);
if path.exists() {
path.canonicalize()
.unwrap_or_else(|_| path.clone())
.to_str()
.unwrap()
.to_string()
} else {
self.storage_uri.clone()
}
} else {
self.storage_uri.clone()
}
}
}