1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
// SPDX-License-Identifier: Apache-2.0
//! S3 storage implementation.
use std::{
io,
sync::{Arc, OnceLock},
};
use aws_sdk_s3::{Client, config::BehaviorVersion};
use aws_smithy_async::rt::sleep::TokioSleep;
use runtime_bridge::RuntimeBridge;
/// S3-compatible object storage backend.
///
/// Stores Heddle objects (blobs, trees, states, actions) in an S3 bucket.
/// Objects are content-addressed using their hashes as keys.
#[derive(Clone)]
pub struct S3Store {
pub(super) client: Arc<Client>,
pub(super) bucket: String,
pub(super) prefix: String,
/// Lazy worker-thread + Tokio runtime that drives async S3 calls off
/// the caller's runtime. See [`RuntimeBridge`]. Wrapped in
/// `Arc<OnceLock<_>>` so every clone of `S3Store` shares one bridge
/// (one worker thread) and the spawn cost is paid on first sync use.
pub(super) bridge: Arc<OnceLock<RuntimeBridge>>,
}
impl S3Store {
/// Create a new S3 store with the given client, bucket, and prefix.
pub fn new(client: Client, bucket: impl Into<String>, prefix: impl Into<String>) -> Self {
Self {
client: Arc::new(client),
bucket: bucket.into(),
prefix: prefix.into(),
bridge: Arc::new(OnceLock::new()),
}
}
/// Create a builder for configuring S3 storage.
pub fn builder() -> S3StoreBuilder {
S3StoreBuilder::new()
}
/// Get the S3 key for a blob.
pub(super) fn blob_key(&self, hash: &crate::object::ContentHash) -> String {
format!("{}blobs/{}.bin", self.prefix, hash.to_hex())
}
/// Get the S3 key for a tree.
pub(super) fn tree_key(&self, hash: &crate::object::ContentHash) -> String {
format!("{}trees/{}.bin", self.prefix, hash.to_hex())
}
/// Get the S3 key for a state.
pub(super) fn state_key(&self, id: &crate::object::ChangeId) -> String {
format!("{}states/{}.bin", self.prefix, id.to_string_full())
}
/// Get the S3 key for an action.
pub(super) fn action_key(&self, id: &crate::object::ActionId) -> String {
format!("{}actions/{}.bin", self.prefix, id)
}
/// Lazily-initialized accessor for the runtime bridge.
///
/// The synchronous `ObjectStore` methods route every `.send().await`
/// call through this bridge so they can be invoked from inside a
/// caller's Tokio runtime (`#[tokio::main]`, `#[tokio::test]`, a
/// daemon worker, etc.) without the nested-`block_on` panic that
/// `Handle::try_current().block_on(...)` triggers. See
/// [`runtime_bridge::RuntimeBridge`] for the design rationale.
pub(super) fn bridge(&self) -> crate::store::Result<&RuntimeBridge> {
if let Some(bridge) = self.bridge.get() {
return Ok(bridge);
}
let new = RuntimeBridge::with_thread_name("heddle-s3-bridge").map_err(|err| {
crate::store::StoreError::Io(io::Error::other(format!(
"S3 runtime bridge: spawn worker thread: {err}",
)))
})?;
// If a concurrent caller already populated the slot, `set` drops
// our worker; its tx side dies with it and the spawned thread
// exits cleanly when `rx.recv()` returns None. First-use only, so
// the wasted spawn is acceptable in exchange for keeping
// `bridge()` lock-free on the hot path.
let _ = self.bridge.set(new);
Ok(self
.bridge
.get()
.expect("OnceLock populated above or by a concurrent caller"))
}
/// Run `future` on the shared S3 runtime bridge and translate its
/// completion into a `Result<T, StoreError>`. Folds the bridge's
/// own liveness errors (`BridgeError::WorkerDead` /
/// `BridgeError::ResponseLost`) and the future's `Result<T, StoreError>`
/// payload into one `?`-friendly return; the bridge-error message
/// labels itself so a stuck SDK call isn't confused with a panicked
/// worker.
pub(super) fn block<F, T>(&self, future: F) -> crate::store::Result<T>
where
F: std::future::Future<Output = crate::store::Result<T>> + Send + 'static,
T: Send + 'static,
{
self.bridge()?.block_on(future).map_err(|err| {
crate::store::StoreError::Io(io::Error::other(format!("S3 runtime bridge: {err}")))
})?
}
/// List objects with a given prefix.
pub(super) async fn list_with_prefix(&self, prefix: &str) -> crate::store::Result<Vec<String>> {
let full_prefix = format!("{}{}", self.prefix, prefix);
let mut keys = Vec::new();
let mut continuation_token = None;
loop {
let mut request = self
.client
.list_objects_v2()
.bucket(&self.bucket)
.prefix(&full_prefix);
if let Some(token) = continuation_token {
request = request.continuation_token(token);
}
let response = request.send().await.map_err(|e| {
crate::store::StoreError::Io(std::io::Error::other(format!(
"S3 list_objects_v2 failed: {}",
e
)))
})?;
if let Some(contents) = response.contents {
for obj in contents {
if let Some(key) = obj.key {
// Strip the prefix from the key
if let Some(stripped) = key.strip_prefix(&self.prefix) {
keys.push(stripped.to_string());
}
}
}
}
if response.is_truncated.unwrap_or(false) {
continuation_token = response.next_continuation_token;
} else {
break;
}
}
Ok(keys)
}
}
/// Builder for configuring S3 storage.
pub struct S3StoreBuilder {
pub(super) bucket: Option<String>,
pub(super) region: Option<String>,
pub(super) prefix: String,
pub(super) endpoint_url: Option<String>,
pub(super) access_key_id: Option<String>,
pub(super) secret_access_key: Option<String>,
pub(super) session_token: Option<String>,
/// Use path-style bucket addressing (`endpoint/bucket/key`) instead of
/// virtual-hosted style (`bucket.endpoint/key`). Required for MinIO and
/// most non-AWS S3-compatible services.
pub(super) force_path_style: bool,
}
impl S3StoreBuilder {
/// Create a new S3 store builder.
pub fn new() -> Self {
Self {
bucket: None,
region: None,
prefix: String::new(),
endpoint_url: None,
access_key_id: None,
secret_access_key: None,
session_token: None,
force_path_style: false,
}
}
/// Set the S3 bucket name.
pub fn bucket(mut self, bucket: impl Into<String>) -> Self {
self.bucket = Some(bucket.into());
self
}
/// Set the AWS region.
pub fn region(mut self, region: impl Into<String>) -> Self {
self.region = Some(region.into());
self
}
/// Set a prefix for all objects in the bucket.
pub fn prefix(mut self, prefix: impl Into<String>) -> Self {
let prefix = prefix.into();
// Ensure prefix ends with a slash
self.prefix = if prefix.is_empty() || prefix.ends_with('/') {
prefix
} else {
format!("{}/", prefix)
};
self
}
/// Set a custom endpoint URL (for MinIO, etc.).
pub fn endpoint_url(mut self, url: impl Into<String>) -> Self {
self.endpoint_url = Some(url.into());
self
}
/// Set the AWS access key ID.
pub fn access_key_id(mut self, key: impl Into<String>) -> Self {
self.access_key_id = Some(key.into());
self
}
/// Set the AWS secret access key.
pub fn secret_access_key(mut self, key: impl Into<String>) -> Self {
self.secret_access_key = Some(key.into());
self
}
/// Set the AWS session token (for temporary credentials).
pub fn session_token(mut self, token: impl Into<String>) -> Self {
self.session_token = Some(token.into());
self
}
/// Use path-style bucket addressing (`endpoint/bucket/key`).
///
/// Required for MinIO, Ceph RGW, and other non-AWS S3-compatible services
/// that do not support virtual-hosted–style addressing. Has no effect when
/// no `endpoint_url` is set.
pub fn force_path_style(mut self, enable: bool) -> Self {
self.force_path_style = enable;
self
}
/// Build the S3 store synchronously.
///
/// Equivalent to [`Self::build`] but callable from a sync context — even
/// from inside a caller's Tokio runtime, where `Handle::block_on(self.build())`
/// would panic with "Cannot start a runtime from within a runtime".
///
/// Routes the async `build()` future through a short-lived
/// [`runtime_bridge::RuntimeBridge`] worker thread (its own private
/// current-thread runtime), so the caller's runtime is never
/// re-entered. The bridge is dropped on return; the resulting
/// [`S3Store`] carries its own lazy `OnceLock<RuntimeBridge>` for
/// subsequent sync `ObjectStore` calls, so no worker thread lingers
/// beyond the build phase.
///
/// `Repository::open` uses this entry point to construct an S3-backed
/// store from sync code that may itself be running on a Tokio
/// runtime (`#[tokio::main]`, `#[tokio::test]`, a daemon worker).
pub fn build_blocking(self) -> crate::store::Result<S3Store> {
let bridge =
RuntimeBridge::with_thread_name("heddle-s3-builder-bridge").map_err(|err| {
crate::store::StoreError::Config(format!(
"S3 store: spawn worker thread for builder: {err}"
))
})?;
bridge.block_on(self.build()).map_err(|err| {
crate::store::StoreError::Config(format!("S3 store: builder runtime bridge: {err}"))
})?
}
/// Build the S3 store.
pub async fn build(self) -> crate::store::Result<S3Store> {
let bucket = self.bucket.ok_or_else(|| {
crate::store::StoreError::Config("S3 bucket name is required".to_string())
})?;
let (Some(access_key_id), Some(secret_access_key)) =
(self.access_key_id, self.secret_access_key)
else {
return Err(crate::store::StoreError::Config(
"S3 access_key_id and secret_access_key are required (set them in the \
server config file or via HEDDLE_SERVER_S3_ACCESS_KEY_ID / \
HEDDLE_SERVER_S3_SECRET_ACCESS_KEY, or AWS_ACCESS_KEY_ID / \
AWS_SECRET_ACCESS_KEY env vars)"
.to_string(),
));
};
let credentials = aws_sdk_s3::config::Credentials::new(
access_key_id,
secret_access_key,
self.session_token,
None,
"heddle-s3-store",
);
let mut s3_config_builder = aws_sdk_s3::config::Builder::new()
.behavior_version(BehaviorVersion::latest())
.credentials_provider(credentials)
.sleep_impl(TokioSleep::new());
if let Some(region) = self.region {
s3_config_builder = s3_config_builder.region(aws_sdk_s3::config::Region::new(region));
}
if let Some(url) = self.endpoint_url {
s3_config_builder = s3_config_builder.endpoint_url(url);
}
if self.force_path_style {
s3_config_builder = s3_config_builder.force_path_style(true);
}
let client = Client::from_conf(s3_config_builder.build());
// Verify the bucket is accessible before handing out the store.
client
.head_bucket()
.bucket(&bucket)
.send()
.await
.map_err(|e| {
crate::store::StoreError::Config(format!(
"Failed to access S3 bucket '{}': {}",
bucket, e
))
})?;
Ok(S3Store::new(client, bucket, self.prefix))
}
}
impl Default for S3StoreBuilder {
fn default() -> Self {
Self::new()
}
}