Skip to main content

alopex_server/ops/
memory.rs

1use std::env;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use alopex_sql::executor::memory::{MemoryPolicy, SpillMetricsSink, SpillPolicy as SqlSpillPolicy};
6
7use crate::error::{Result, ServerError};
8use crate::metrics::Metrics;
9
10#[derive(Clone, Debug)]
11pub enum SpillPolicy {
12    FailFast,
13    SpillToDisk { directory: PathBuf },
14}
15
16#[derive(Clone)]
17pub struct MemoryControlPolicy {
18    limit_bytes: Option<u64>,
19    spill_policy: SpillPolicy,
20    metrics: Option<Metrics>,
21}
22
23impl MemoryControlPolicy {
24    pub fn from_env() -> Self {
25        let limit_bytes = env::var("ALOPEX_MEMORY_LIMIT_BYTES")
26            .ok()
27            .and_then(|val| val.parse::<u64>().ok());
28
29        let policy = env::var("ALOPEX_MEMORY_SPILL_POLICY")
30            .unwrap_or_else(|_| "fail_fast".to_string())
31            .to_ascii_lowercase();
32        let spill_dir = env::var("ALOPEX_MEMORY_SPILL_DIR").ok().map(PathBuf::from);
33
34        let spill_policy = match policy.as_str() {
35            "spill" | "spill_to_disk" | "spill-to-disk" => spill_dir
36                .map(|directory| SpillPolicy::SpillToDisk { directory })
37                .unwrap_or(SpillPolicy::FailFast),
38            _ => SpillPolicy::FailFast,
39        };
40
41        Self {
42            limit_bytes,
43            spill_policy,
44            metrics: None,
45        }
46    }
47
48    pub fn from_env_with_metrics(metrics: Metrics) -> Self {
49        Self::from_env().with_metrics(metrics)
50    }
51
52    pub fn with_metrics(mut self, metrics: Metrics) -> Self {
53        self.metrics = Some(metrics);
54        self
55    }
56
57    pub fn limit_bytes(&self) -> Option<u64> {
58        self.limit_bytes
59    }
60
61    pub fn spill_policy(&self) -> &SpillPolicy {
62        &self.spill_policy
63    }
64
65    pub fn sql_policy(&self) -> Option<MemoryPolicy> {
66        let limit = self.limit_bytes?;
67        let spill_policy = match &self.spill_policy {
68            SpillPolicy::FailFast => SqlSpillPolicy::FailFast,
69            SpillPolicy::SpillToDisk { directory } => SqlSpillPolicy::SpillToDisk {
70                directory: directory.clone(),
71            },
72        };
73        let mut policy = MemoryPolicy::new(Some(limit), spill_policy);
74        if let Some(metrics) = &self.metrics {
75            policy = policy.with_metrics(Arc::new(MetricsSpillSink {
76                metrics: metrics.clone(),
77            }));
78        }
79        Some(policy)
80    }
81
82    pub fn enforce_output_bytes(&self, bytes: u64) -> Result<()> {
83        let Some(limit) = self.limit_bytes else {
84            return Ok(());
85        };
86        if bytes <= limit {
87            return Ok(());
88        }
89        self.enforce_limit(limit, bytes)
90    }
91
92    fn enforce_limit(&self, limit: u64, bytes: u64) -> Result<()> {
93        match &self.spill_policy {
94            SpillPolicy::FailFast => Err(ServerError::PayloadTooLarge(format!(
95                "query memory limit exceeded: {bytes} bytes (limit {limit})"
96            ))),
97            SpillPolicy::SpillToDisk { .. } => Err(ServerError::PayloadTooLarge(format!(
98                "query output exceeds memory limit: {bytes} bytes (limit {limit})"
99            ))),
100        }
101    }
102}
103
104impl std::fmt::Debug for MemoryControlPolicy {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        f.debug_struct("MemoryControlPolicy")
107            .field("limit_bytes", &self.limit_bytes)
108            .field("spill_policy", &self.spill_policy)
109            .finish()
110    }
111}
112
113struct MetricsSpillSink {
114    metrics: Metrics,
115}
116
117impl SpillMetricsSink for MetricsSpillSink {
118    fn record_spill(&self, bytes: u64, files: u64) {
119        self.metrics.record_spill(bytes, files);
120    }
121}