use crate::connection::ConnectionManager;
use crate::types::{Error, NatsConfig, Result};
use chrono::Utc;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::sync::Arc;
use tokio::io::AsyncReadExt;
use tokio::sync::Mutex;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ObjectMeta {
pub name: String,
pub bucket: String,
pub size: usize,
pub sha256: String,
pub created: String,
pub description: Option<String>,
}
pub struct ShipObjectStore {
bucket_name: String,
conn: Arc<Mutex<ConnectionManager>>,
store: Arc<Mutex<Option<async_nats::jetstream::object_store::ObjectStore>>>,
}
impl ShipObjectStore {
pub fn new(bucket_name: impl Into<String>, config: NatsConfig) -> Self {
Self {
bucket_name: bucket_name.into(),
conn: Arc::new(Mutex::new(ConnectionManager::new(config))),
store: Arc::new(Mutex::new(None)),
}
}
pub async fn connect(&self) -> Result<()> {
let mut conn = self.conn.lock().await;
conn.connect().await?;
let obj_store = conn.ensure_object_store(&self.bucket_name).await?;
*self.store.lock().await = Some(obj_store);
Ok(())
}
fn store_ref(
guard: &Option<async_nats::jetstream::object_store::ObjectStore>,
) -> Result<&async_nats::jetstream::object_store::ObjectStore> {
guard.as_ref().ok_or(Error::NotConnected)
}
pub async fn put(
&self,
name: &str,
data: &[u8],
description: Option<&str>,
) -> Result<ObjectMeta> {
let sha256 = hex::encode(Sha256::digest(data));
let guard = self.store.lock().await;
let store = Self::store_ref(&guard)?;
let obj_meta = async_nats::jetstream::object_store::ObjectMetadata {
name: name.to_string(),
description: description.map(|s| s.to_string()),
..Default::default()
};
store
.put(obj_meta, &mut data.to_vec().as_slice())
.await
.map_err(|e| Error::JetStream(e.to_string()))?;
Ok(ObjectMeta {
name: name.to_string(),
bucket: self.bucket_name.clone(),
size: data.len(),
sha256,
created: Utc::now().to_rfc3339(),
description: description.map(|s| s.to_string()),
})
}
pub async fn put_file(
&self,
name: &str,
path: &std::path::Path,
description: Option<&str>,
) -> Result<ObjectMeta> {
let data = tokio::fs::read(path)
.await
.map_err(|e| Error::Connection(format!("file read error: {}", e)))?;
self.put(name, &data, description).await
}
pub async fn get(&self, name: &str) -> Result<Option<Vec<u8>>> {
let guard = self.store.lock().await;
let store = Self::store_ref(&guard)?;
match store.get(name).await {
Ok(mut obj) => {
let mut data = Vec::new();
obj.read_to_end(&mut data)
.await
.map_err(|e| Error::JetStream(e.to_string()))?;
Ok(Some(data))
}
Err(_) => Ok(None),
}
}
pub async fn delete(&self, name: &str) -> Result<()> {
let guard = self.store.lock().await;
let store = Self::store_ref(&guard)?;
store
.delete(name)
.await
.map_err(|e| Error::JetStream(e.to_string()))?;
Ok(())
}
pub async fn list(&self) -> Result<Vec<async_nats::jetstream::object_store::ObjectInfo>> {
let guard = self.store.lock().await;
let store = Self::store_ref(&guard)?;
let mut entries = store
.list()
.await
.map_err(|e| Error::JetStream(e.to_string()))?;
let mut result = Vec::new();
while let Some(entry) = entries.next().await {
if let Ok(info) = entry {
result.push(info);
}
}
Ok(result)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SnapshotInfo {
pub snapshot_id: String,
pub being_id: String,
pub reason: String,
pub created: String,
pub size: usize,
}
pub struct BeingSnapshots {
store: ShipObjectStore,
}
impl BeingSnapshots {
pub fn new(config: NatsConfig) -> Self {
Self {
store: ShipObjectStore::new("snapshots", config),
}
}
pub async fn connect(&self) -> Result<()> {
self.store.connect().await
}
pub async fn take(
&self,
being_id: &str,
state: &serde_json::Value,
reason: &str,
) -> Result<String> {
let now = Utc::now();
let snapshot_id = format!("{}_{}", being_id, now.format("%Y%m%d_%H%M%S"));
let envelope = serde_json::json!({
"being_id": being_id,
"reason": reason,
"state": state,
"created": now.to_rfc3339(),
});
let data = serde_json::to_vec(&envelope)?;
self.store.put(&snapshot_id, &data, Some(reason)).await?;
Ok(snapshot_id)
}
pub async fn restore(&self, snapshot_id: &str) -> Result<Option<serde_json::Value>> {
match self.store.get(snapshot_id).await? {
Some(data) => {
let envelope: serde_json::Value = serde_json::from_slice(&data)?;
Ok(envelope.get("state").cloned())
}
None => Ok(None),
}
}
pub async fn delete(&self, snapshot_id: &str) -> Result<()> {
self.store.delete(snapshot_id).await
}
}
pub struct ArtifactStore {
store: ShipObjectStore,
}
impl ArtifactStore {
pub fn new(config: NatsConfig) -> Self {
Self {
store: ShipObjectStore::new("artifacts", config),
}
}
pub async fn connect(&self) -> Result<()> {
self.store.connect().await
}
pub async fn store_artifact(
&self,
name: &str,
data: &[u8],
artifact_type: &str,
expedition: Option<&str>,
) -> Result<ObjectMeta> {
let desc = match expedition {
Some(exp) => format!("type={}, expedition={}", artifact_type, exp),
None => format!("type={}", artifact_type),
};
self.store.put(name, data, Some(&desc)).await
}
pub async fn get(&self, name: &str) -> Result<Option<Vec<u8>>> {
self.store.get(name).await
}
}
mod hex {
pub fn encode(bytes: impl AsRef<[u8]>) -> String {
bytes
.as_ref()
.iter()
.map(|b| format!("{:02x}", b))
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn sha256_hex_encode() {
let data = b"hello world";
let hash = hex::encode(Sha256::digest(data));
assert_eq!(
hash,
"b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
);
}
#[test]
fn object_meta_serialization() {
let meta = ObjectMeta {
name: "test.json".to_string(),
bucket: "artifacts".to_string(),
size: 42,
sha256: "abc123".to_string(),
created: "2026-03-19T00:00:00Z".to_string(),
description: Some("test artifact".to_string()),
};
let json = serde_json::to_string(&meta).unwrap();
let back: ObjectMeta = serde_json::from_str(&json).unwrap();
assert_eq!(back.name, "test.json");
assert_eq!(back.size, 42);
}
#[test]
fn snapshot_id_format() {
let being_id = "mini";
let now = Utc::now();
let id = format!("{}_{}", being_id, now.format("%Y%m%d_%H%M%S"));
assert!(id.starts_with("mini_"));
assert!(id.len() >= 20);
}
#[test]
fn snapshot_info_serialization() {
let info = SnapshotInfo {
snapshot_id: "mini_20260319_120000".to_string(),
being_id: "mini".to_string(),
reason: "checkpoint".to_string(),
created: "2026-03-19T12:00:00Z".to_string(),
size: 1024,
};
let json = serde_json::to_string(&info).unwrap();
let back: SnapshotInfo = serde_json::from_str(&json).unwrap();
assert_eq!(back.snapshot_id, "mini_20260319_120000");
}
#[test]
fn artifact_description_format() {
let desc = format!("type={}, expedition={}", "code", "EX-3293");
assert_eq!(desc, "type=code, expedition=EX-3293");
}
#[test]
fn artifact_description_no_expedition() {
let desc = format!("type={}", "log");
assert_eq!(desc, "type=log");
}
}