ursula_runtime/
cold_store.rs1use std::fs;
2use std::io;
3use std::path::Path;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use opendal::{Operator, Scheme};
9use ursula_shard::BucketStreamId;
10use ursula_stream::{ColdChunkRef, ObjectPayloadRef};
11
12pub(crate) const DEFAULT_CONTENT_TYPE: &str = "application/octet-stream";
13static COLD_CHUNK_SEQUENCE: AtomicU64 = AtomicU64::new(0);
14
15#[derive(Clone, Debug)]
16pub struct ColdStore {
17 operator: Operator,
18}
19
20pub type ColdStoreHandle = Arc<ColdStore>;
21
22impl ColdStore {
23 pub fn memory() -> io::Result<Self> {
24 let operator = Operator::via_iter(Scheme::Memory, [])
25 .map_err(|err| io::Error::other(err.to_string()))?;
26 Ok(Self { operator })
27 }
28
29 pub fn fs(root: impl AsRef<Path>) -> io::Result<Self> {
30 let root = root.as_ref();
31 fs::create_dir_all(root)?;
32 let operator = Operator::via_iter(
33 Scheme::Fs,
34 [("root".to_owned(), root.to_string_lossy().to_string())],
35 )
36 .map_err(|err| io::Error::other(err.to_string()))?;
37 Ok(Self { operator })
38 }
39
40 pub fn s3_from_env() -> io::Result<Self> {
41 Self::s3_from_env_with_root(None)
42 }
43
44 pub fn s3_from_env_with_root(root_override: Option<&str>) -> io::Result<Self> {
45 let bucket = std::env::var("URSULA_COLD_S3_BUCKET").map_err(|_| {
46 io::Error::new(
47 io::ErrorKind::InvalidInput,
48 "URSULA_COLD_S3_BUCKET is required when URSULA_COLD_BACKEND=s3",
49 )
50 })?;
51 if bucket.trim().is_empty() {
52 return Err(io::Error::new(
53 io::ErrorKind::InvalidInput,
54 "URSULA_COLD_S3_BUCKET must not be empty",
55 ));
56 }
57
58 let mut builder = opendal::services::S3::default().bucket(&bucket);
59 if let Some(root) = root_override {
60 if !root.trim().is_empty() {
61 builder = builder.root(root);
62 }
63 } else if let Ok(root) = std::env::var("URSULA_COLD_ROOT")
64 && !root.trim().is_empty()
65 {
66 builder = builder.root(&root);
67 }
68 if let Ok(region) = std::env::var("URSULA_COLD_S3_REGION")
69 && !region.trim().is_empty()
70 {
71 builder = builder.region(®ion);
72 }
73 if let Ok(endpoint) = std::env::var("URSULA_COLD_S3_ENDPOINT")
74 && !endpoint.trim().is_empty()
75 {
76 builder = builder.endpoint(&endpoint);
77 }
78 if let Ok(access_key_id) = std::env::var("URSULA_COLD_S3_ACCESS_KEY_ID")
79 && !access_key_id.trim().is_empty()
80 {
81 builder = builder.access_key_id(&access_key_id);
82 }
83 if let Ok(secret_access_key) = std::env::var("URSULA_COLD_S3_SECRET_ACCESS_KEY")
84 && !secret_access_key.trim().is_empty()
85 {
86 builder = builder.secret_access_key(&secret_access_key);
87 }
88 if let Ok(session_token) = std::env::var("URSULA_COLD_S3_SESSION_TOKEN")
89 && !session_token.trim().is_empty()
90 {
91 builder = builder.session_token(&session_token);
92 }
93
94 Ok(Self {
95 operator: Operator::new(builder)
96 .map_err(|err| io::Error::other(err.to_string()))?
97 .finish(),
98 })
99 }
100
101 pub fn from_env() -> io::Result<Option<ColdStoreHandle>> {
102 let backend = std::env::var("URSULA_COLD_BACKEND")
103 .unwrap_or_else(|_| "none".to_owned())
104 .to_ascii_lowercase();
105 let store = match backend.as_str() {
106 "none" | "disabled" | "off" => return Ok(None),
107 "memory" | "mem" | "inmem" => Self::memory()?,
108 "fs" => {
109 let root =
110 std::env::var("URSULA_COLD_ROOT").unwrap_or_else(|_| "data/cold".to_owned());
111 Self::fs(root)?
112 }
113 "s3" => Self::s3_from_env()?,
114 other => {
115 return Err(io::Error::new(
116 io::ErrorKind::InvalidInput,
117 format!("unsupported URSULA_COLD_BACKEND '{other}'"),
118 ));
119 }
120 };
121 Ok(Some(Arc::new(store)))
122 }
123
124 pub async fn write_chunk(&self, path: &str, payload: &[u8]) -> io::Result<u64> {
125 if path.trim().is_empty() {
126 return Err(io::Error::new(
127 io::ErrorKind::InvalidInput,
128 "cold chunk path must not be empty",
129 ));
130 }
131 self.operator
132 .write(path, payload.to_vec())
133 .await
134 .map_err(|err| cold_store_io_error(path, err))?;
135 Ok(u64::try_from(payload.len()).expect("payload len fits u64"))
136 }
137
138 pub async fn delete_chunk(&self, path: &str) -> io::Result<()> {
139 if path.trim().is_empty() {
140 return Err(io::Error::new(
141 io::ErrorKind::InvalidInput,
142 "cold chunk path must not be empty",
143 ));
144 }
145 self.operator
146 .delete(path)
147 .await
148 .map_err(|err| cold_store_io_error(path, err))
149 }
150
151 pub async fn remove_all(&self, path: &str) -> io::Result<()> {
152 self.operator
153 .remove_all(path)
154 .await
155 .map_err(|err| cold_store_io_error(path, err))
156 }
157
158 pub async fn read_chunk_range(
159 &self,
160 chunk: &ColdChunkRef,
161 read_start_offset: u64,
162 len: usize,
163 ) -> io::Result<Vec<u8>> {
164 let object = ObjectPayloadRef {
165 start_offset: chunk.start_offset,
166 end_offset: chunk.end_offset,
167 s3_path: chunk.s3_path.clone(),
168 object_size: chunk.object_size,
169 };
170 self.read_object_range(&object, read_start_offset, len)
171 .await
172 }
173
174 pub async fn read_object_range(
175 &self,
176 object: &ObjectPayloadRef,
177 read_start_offset: u64,
178 len: usize,
179 ) -> io::Result<Vec<u8>> {
180 if len == 0 {
181 return Ok(Vec::new());
182 }
183 let len_u64 = u64::try_from(len).map_err(|_| {
184 io::Error::new(io::ErrorKind::InvalidInput, "cold read length exceeds u64")
185 })?;
186 let read_end = read_start_offset.checked_add(len_u64).ok_or_else(|| {
187 io::Error::new(io::ErrorKind::InvalidInput, "cold read range overflow")
188 })?;
189 if read_start_offset < object.start_offset || read_end > object.end_offset {
190 return Err(io::Error::new(
191 io::ErrorKind::InvalidInput,
192 format!(
193 "cold read range [{read_start_offset}..{read_end}) is outside object segment [{}..{})",
194 object.start_offset, object.end_offset
195 ),
196 ));
197 }
198 let object_start = read_start_offset - object.start_offset;
199 let object_end = object_start.checked_add(len_u64).ok_or_else(|| {
200 io::Error::new(io::ErrorKind::InvalidInput, "cold read range overflow")
201 })?;
202 if object_end > object.object_size {
203 return Err(io::Error::new(
204 io::ErrorKind::InvalidData,
205 format!(
206 "cold read range [{object_start}..{object_end}) is outside object '{}' size {}",
207 object.s3_path, object.object_size
208 ),
209 ));
210 }
211 let bytes = self
212 .operator
213 .read_with(&object.s3_path)
214 .range(object_start..object_end)
215 .await
216 .map_err(|err| cold_store_io_error(&object.s3_path, err))?
217 .to_bytes();
218 if bytes.len() != len {
219 return Err(io::Error::new(
220 io::ErrorKind::InvalidData,
221 format!(
222 "cold object '{}' returned {} bytes for requested range [{}..{})",
223 object.s3_path,
224 bytes.len(),
225 object_start,
226 object_end
227 ),
228 ));
229 }
230 Ok(bytes.to_vec())
231 }
232}
233
234fn cold_store_io_error(path: &str, err: opendal::Error) -> io::Error {
235 io::Error::other(format!("cold object '{path}': {err}"))
236}
237
238pub fn new_cold_chunk_path(
239 stream_id: &BucketStreamId,
240 start_offset: u64,
241 end_offset: u64,
242) -> String {
243 let unix_nanos = SystemTime::now()
244 .duration_since(UNIX_EPOCH)
245 .map(|duration| duration.as_nanos())
246 .unwrap_or(0);
247 let sequence = COLD_CHUNK_SEQUENCE.fetch_add(1, Ordering::Relaxed);
248 format!(
249 "{stream_id}/chunks/{start_offset:016x}-{end_offset:016x}-{unix_nanos:032x}-{sequence:016x}.bin"
250 )
251}
252
253pub fn new_external_payload_path(stream_id: &BucketStreamId) -> String {
254 let unix_nanos = SystemTime::now()
255 .duration_since(UNIX_EPOCH)
256 .map(|duration| duration.as_nanos())
257 .unwrap_or(0);
258 let sequence = COLD_CHUNK_SEQUENCE.fetch_add(1, Ordering::Relaxed);
259 format!("{stream_id}/external/{unix_nanos:032x}-{sequence:016x}.bin")
260}