Skip to main content

ursula_runtime/
cold_store.rs

1use std::fs;
2use std::io;
3use std::path::Path;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use opendal::{Operator, Scheme};
9use ursula_shard::BucketStreamId;
10use ursula_stream::{ColdChunkRef, ObjectPayloadRef};
11
12pub(crate) const DEFAULT_CONTENT_TYPE: &str = "application/octet-stream";
13static COLD_CHUNK_SEQUENCE: AtomicU64 = AtomicU64::new(0);
14
15#[derive(Clone, Debug)]
16pub struct ColdStore {
17    operator: Operator,
18}
19
20pub type ColdStoreHandle = Arc<ColdStore>;
21
22impl ColdStore {
23    pub fn memory() -> io::Result<Self> {
24        let operator = Operator::via_iter(Scheme::Memory, [])
25            .map_err(|err| io::Error::other(err.to_string()))?;
26        Ok(Self { operator })
27    }
28
29    pub fn fs(root: impl AsRef<Path>) -> io::Result<Self> {
30        let root = root.as_ref();
31        fs::create_dir_all(root)?;
32        let operator = Operator::via_iter(
33            Scheme::Fs,
34            [("root".to_owned(), root.to_string_lossy().to_string())],
35        )
36        .map_err(|err| io::Error::other(err.to_string()))?;
37        Ok(Self { operator })
38    }
39
40    pub fn s3_from_env() -> io::Result<Self> {
41        Self::s3_from_env_with_root(None)
42    }
43
44    pub fn s3_from_env_with_root(root_override: Option<&str>) -> io::Result<Self> {
45        let bucket = std::env::var("URSULA_COLD_S3_BUCKET").map_err(|_| {
46            io::Error::new(
47                io::ErrorKind::InvalidInput,
48                "URSULA_COLD_S3_BUCKET is required when URSULA_COLD_BACKEND=s3",
49            )
50        })?;
51        if bucket.trim().is_empty() {
52            return Err(io::Error::new(
53                io::ErrorKind::InvalidInput,
54                "URSULA_COLD_S3_BUCKET must not be empty",
55            ));
56        }
57
58        let mut builder = opendal::services::S3::default().bucket(&bucket);
59        if let Some(root) = root_override {
60            if !root.trim().is_empty() {
61                builder = builder.root(root);
62            }
63        } else if let Ok(root) = std::env::var("URSULA_COLD_ROOT")
64            && !root.trim().is_empty()
65        {
66            builder = builder.root(&root);
67        }
68        if let Ok(region) = std::env::var("URSULA_COLD_S3_REGION")
69            && !region.trim().is_empty()
70        {
71            builder = builder.region(&region);
72        }
73        if let Ok(endpoint) = std::env::var("URSULA_COLD_S3_ENDPOINT")
74            && !endpoint.trim().is_empty()
75        {
76            builder = builder.endpoint(&endpoint);
77        }
78        if let Ok(access_key_id) = std::env::var("URSULA_COLD_S3_ACCESS_KEY_ID")
79            && !access_key_id.trim().is_empty()
80        {
81            builder = builder.access_key_id(&access_key_id);
82        }
83        if let Ok(secret_access_key) = std::env::var("URSULA_COLD_S3_SECRET_ACCESS_KEY")
84            && !secret_access_key.trim().is_empty()
85        {
86            builder = builder.secret_access_key(&secret_access_key);
87        }
88        if let Ok(session_token) = std::env::var("URSULA_COLD_S3_SESSION_TOKEN")
89            && !session_token.trim().is_empty()
90        {
91            builder = builder.session_token(&session_token);
92        }
93
94        Ok(Self {
95            operator: Operator::new(builder)
96                .map_err(|err| io::Error::other(err.to_string()))?
97                .finish(),
98        })
99    }
100
101    pub fn from_env() -> io::Result<Option<ColdStoreHandle>> {
102        let backend = std::env::var("URSULA_COLD_BACKEND")
103            .unwrap_or_else(|_| "none".to_owned())
104            .to_ascii_lowercase();
105        let store = match backend.as_str() {
106            "none" | "disabled" | "off" => return Ok(None),
107            "memory" | "mem" | "inmem" => Self::memory()?,
108            "fs" => {
109                let root =
110                    std::env::var("URSULA_COLD_ROOT").unwrap_or_else(|_| "data/cold".to_owned());
111                Self::fs(root)?
112            }
113            "s3" => Self::s3_from_env()?,
114            other => {
115                return Err(io::Error::new(
116                    io::ErrorKind::InvalidInput,
117                    format!("unsupported URSULA_COLD_BACKEND '{other}'"),
118                ));
119            }
120        };
121        Ok(Some(Arc::new(store)))
122    }
123
124    pub async fn write_chunk(&self, path: &str, payload: &[u8]) -> io::Result<u64> {
125        if path.trim().is_empty() {
126            return Err(io::Error::new(
127                io::ErrorKind::InvalidInput,
128                "cold chunk path must not be empty",
129            ));
130        }
131        self.operator
132            .write(path, payload.to_vec())
133            .await
134            .map_err(|err| cold_store_io_error(path, err))?;
135        Ok(u64::try_from(payload.len()).expect("payload len fits u64"))
136    }
137
138    pub async fn delete_chunk(&self, path: &str) -> io::Result<()> {
139        if path.trim().is_empty() {
140            return Err(io::Error::new(
141                io::ErrorKind::InvalidInput,
142                "cold chunk path must not be empty",
143            ));
144        }
145        self.operator
146            .delete(path)
147            .await
148            .map_err(|err| cold_store_io_error(path, err))
149    }
150
151    pub async fn remove_all(&self, path: &str) -> io::Result<()> {
152        self.operator
153            .remove_all(path)
154            .await
155            .map_err(|err| cold_store_io_error(path, err))
156    }
157
158    pub async fn read_chunk_range(
159        &self,
160        chunk: &ColdChunkRef,
161        read_start_offset: u64,
162        len: usize,
163    ) -> io::Result<Vec<u8>> {
164        let object = ObjectPayloadRef {
165            start_offset: chunk.start_offset,
166            end_offset: chunk.end_offset,
167            s3_path: chunk.s3_path.clone(),
168            object_size: chunk.object_size,
169        };
170        self.read_object_range(&object, read_start_offset, len)
171            .await
172    }
173
174    pub async fn read_object_range(
175        &self,
176        object: &ObjectPayloadRef,
177        read_start_offset: u64,
178        len: usize,
179    ) -> io::Result<Vec<u8>> {
180        if len == 0 {
181            return Ok(Vec::new());
182        }
183        let len_u64 = u64::try_from(len).map_err(|_| {
184            io::Error::new(io::ErrorKind::InvalidInput, "cold read length exceeds u64")
185        })?;
186        let read_end = read_start_offset.checked_add(len_u64).ok_or_else(|| {
187            io::Error::new(io::ErrorKind::InvalidInput, "cold read range overflow")
188        })?;
189        if read_start_offset < object.start_offset || read_end > object.end_offset {
190            return Err(io::Error::new(
191                io::ErrorKind::InvalidInput,
192                format!(
193                    "cold read range [{read_start_offset}..{read_end}) is outside object segment [{}..{})",
194                    object.start_offset, object.end_offset
195                ),
196            ));
197        }
198        let object_start = read_start_offset - object.start_offset;
199        let object_end = object_start.checked_add(len_u64).ok_or_else(|| {
200            io::Error::new(io::ErrorKind::InvalidInput, "cold read range overflow")
201        })?;
202        if object_end > object.object_size {
203            return Err(io::Error::new(
204                io::ErrorKind::InvalidData,
205                format!(
206                    "cold read range [{object_start}..{object_end}) is outside object '{}' size {}",
207                    object.s3_path, object.object_size
208                ),
209            ));
210        }
211        let bytes = self
212            .operator
213            .read_with(&object.s3_path)
214            .range(object_start..object_end)
215            .await
216            .map_err(|err| cold_store_io_error(&object.s3_path, err))?
217            .to_bytes();
218        if bytes.len() != len {
219            return Err(io::Error::new(
220                io::ErrorKind::InvalidData,
221                format!(
222                    "cold object '{}' returned {} bytes for requested range [{}..{})",
223                    object.s3_path,
224                    bytes.len(),
225                    object_start,
226                    object_end
227                ),
228            ));
229        }
230        Ok(bytes.to_vec())
231    }
232}
233
234fn cold_store_io_error(path: &str, err: opendal::Error) -> io::Error {
235    io::Error::other(format!("cold object '{path}': {err}"))
236}
237
238pub fn new_cold_chunk_path(
239    stream_id: &BucketStreamId,
240    start_offset: u64,
241    end_offset: u64,
242) -> String {
243    let unix_nanos = SystemTime::now()
244        .duration_since(UNIX_EPOCH)
245        .map(|duration| duration.as_nanos())
246        .unwrap_or(0);
247    let sequence = COLD_CHUNK_SEQUENCE.fetch_add(1, Ordering::Relaxed);
248    format!(
249        "{stream_id}/chunks/{start_offset:016x}-{end_offset:016x}-{unix_nanos:032x}-{sequence:016x}.bin"
250    )
251}
252
253pub fn new_external_payload_path(stream_id: &BucketStreamId) -> String {
254    let unix_nanos = SystemTime::now()
255        .duration_since(UNIX_EPOCH)
256        .map(|duration| duration.as_nanos())
257        .unwrap_or(0);
258    let sequence = COLD_CHUNK_SEQUENCE.fetch_add(1, Ordering::Relaxed);
259    format!("{stream_id}/external/{unix_nanos:032x}-{sequence:016x}.bin")
260}