alopex_server/ops/
memory.rs1use std::env;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use alopex_sql::executor::memory::{MemoryPolicy, SpillMetricsSink, SpillPolicy as SqlSpillPolicy};
6
7use crate::error::{Result, ServerError};
8use crate::metrics::Metrics;
9
10#[derive(Clone, Debug)]
11pub enum SpillPolicy {
12 FailFast,
13 SpillToDisk { directory: PathBuf },
14}
15
16#[derive(Clone)]
17pub struct MemoryControlPolicy {
18 limit_bytes: Option<u64>,
19 spill_policy: SpillPolicy,
20 metrics: Option<Metrics>,
21}
22
23impl MemoryControlPolicy {
24 pub fn from_env() -> Self {
25 let limit_bytes = env::var("ALOPEX_MEMORY_LIMIT_BYTES")
26 .ok()
27 .and_then(|val| val.parse::<u64>().ok());
28
29 let policy = env::var("ALOPEX_MEMORY_SPILL_POLICY")
30 .unwrap_or_else(|_| "fail_fast".to_string())
31 .to_ascii_lowercase();
32 let spill_dir = env::var("ALOPEX_MEMORY_SPILL_DIR").ok().map(PathBuf::from);
33
34 let spill_policy = match policy.as_str() {
35 "spill" | "spill_to_disk" | "spill-to-disk" => spill_dir
36 .map(|directory| SpillPolicy::SpillToDisk { directory })
37 .unwrap_or(SpillPolicy::FailFast),
38 _ => SpillPolicy::FailFast,
39 };
40
41 Self {
42 limit_bytes,
43 spill_policy,
44 metrics: None,
45 }
46 }
47
48 pub fn from_env_with_metrics(metrics: Metrics) -> Self {
49 Self::from_env().with_metrics(metrics)
50 }
51
52 pub fn with_metrics(mut self, metrics: Metrics) -> Self {
53 self.metrics = Some(metrics);
54 self
55 }
56
57 pub fn limit_bytes(&self) -> Option<u64> {
58 self.limit_bytes
59 }
60
61 pub fn spill_policy(&self) -> &SpillPolicy {
62 &self.spill_policy
63 }
64
65 pub fn sql_policy(&self) -> Option<MemoryPolicy> {
66 let limit = self.limit_bytes?;
67 let spill_policy = match &self.spill_policy {
68 SpillPolicy::FailFast => SqlSpillPolicy::FailFast,
69 SpillPolicy::SpillToDisk { directory } => SqlSpillPolicy::SpillToDisk {
70 directory: directory.clone(),
71 },
72 };
73 let mut policy = MemoryPolicy::new(Some(limit), spill_policy);
74 if let Some(metrics) = &self.metrics {
75 policy = policy.with_metrics(Arc::new(MetricsSpillSink {
76 metrics: metrics.clone(),
77 }));
78 }
79 Some(policy)
80 }
81
82 pub fn enforce_output_bytes(&self, bytes: u64) -> Result<()> {
83 let Some(limit) = self.limit_bytes else {
84 return Ok(());
85 };
86 if bytes <= limit {
87 return Ok(());
88 }
89 self.enforce_limit(limit, bytes)
90 }
91
92 fn enforce_limit(&self, limit: u64, bytes: u64) -> Result<()> {
93 match &self.spill_policy {
94 SpillPolicy::FailFast => Err(ServerError::PayloadTooLarge(format!(
95 "query memory limit exceeded: {bytes} bytes (limit {limit})"
96 ))),
97 SpillPolicy::SpillToDisk { .. } => Err(ServerError::PayloadTooLarge(format!(
98 "query output exceeds memory limit: {bytes} bytes (limit {limit})"
99 ))),
100 }
101 }
102}
103
104impl std::fmt::Debug for MemoryControlPolicy {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 f.debug_struct("MemoryControlPolicy")
107 .field("limit_bytes", &self.limit_bytes)
108 .field("spill_policy", &self.spill_policy)
109 .finish()
110 }
111}
112
113struct MetricsSpillSink {
114 metrics: Metrics,
115}
116
117impl SpillMetricsSink for MetricsSpillSink {
118 fn record_spill(&self, bytes: u64, files: u64) {
119 self.metrics.record_spill(bytes, files);
120 }
121}