dog-blob
Production-ready blob storage infrastructure for DogRS applications

dog-blob provides streaming-first, resumable, range-friendly blob storage for DogRS applications with zero boilerplate. It's designed to eliminate all the routine media handling code that services shouldn't have to write.
Perfect for: Video streaming, file uploads, media processing, document storage, backup systems, and any application that needs reliable blob handling.
Features
- Streaming-first: Handle huge video files without buffering
- Multipart/resumable uploads: Built-in coordination for large files
- Range requests: First-class support for video/audio scrubbing
- Storage agnostic: Works with any backend (memory, filesystem, S3, etc.)
- Server agnostic: No HTTP coupling - works with any protocol
- Zero boilerplate: Services focus on business logic, not media mechanics
Table of Contents
Quick Start
Add to your Cargo.toml:
[dependencies]
dog-blob = "0.1.0"
30-Second Example
use dog_blob::prelude::*;
use tokio_util::io::ReaderStream;
use std::io::Cursor;
#[tokio::main]
async fn main() -> BlobResult<()> {
let store = dog_blob::S3CompatibleStore::from_env()?;
let adapter = BlobAdapter::new(store, BlobConfig::default());
let ctx = BlobCtx::new("my-app".to_string())
.with_actor("user-123".to_string());
let data = b"Hello, world!";
let stream = ReaderStream::new(Cursor::new(data));
let put_request = BlobPut::new()
.with_content_type("text/plain")
.with_filename("hello.txt");
let receipt = adapter.put(ctx.clone(), put_request, Box::pin(stream)).await?;
println!("Uploaded! ID: {}", receipt.id);
let opened = adapter.open(ctx, receipt.id, None).await?;
println!("Downloaded {} bytes", opened.content_length());
Ok(())
}
Core Concepts
dog-blob is built around four key concepts:
BlobAdapter - The main interface you embed in your services
BlobStore - Pluggable storage backend (S3, filesystem, memory, etc.)
BlobCtx - Tenant/user context for multi-tenant applications
BlobReceipt - Portable metadata returned after successful storage
Service Integration
The key insight: BlobAdapter is infrastructure, not a service. You embed it in your services:
pub struct MediaService {
blobs: BlobAdapter, }
impl MediaService {
pub async fn upload_photo(&self, tenant_id: String, data: Vec<u8>) -> Result<MediaResponse, Error> {
let ctx = BlobCtx::new(tenant_id);
let stream = stream::once(async { Ok(Bytes::from(data)) });
let receipt = self.blobs.put(ctx, BlobPut::new(), Box::pin(stream)).await?;
Ok(MediaResponse {
id: receipt.id.to_string(),
url: format!("/media/{}", receipt.id),
size: receipt.size_bytes,
})
}
}
Architecture
dog-blob follows a clean separation of concerns:
┌─────────────────┐
│ Your Service │ ← Business logic only
├─────────────────┤
│ BlobAdapter │ ← Media coordination
├─────────────────┤
│ BlobStore │ ← Storage primitives
└─────────────────┘
Core Types
BlobAdapter: Main interface - embed this in your services
BlobStore: Storage backend trait (implement for new storage types)
BlobReceipt: Portable metadata returned after storage
BlobCtx: Tenant/user context for multi-tenant apps
Storage Backends
dog-blob supports multiple storage backends through the BlobStore trait:
S3-Compatible Store (Production Ready)
Works with AWS S3, MinIO, DigitalOcean Spaces, and other S3-compatible services:
use dog_blob::{S3CompatibleStore, S3Config, BlobAdapter, BlobConfig};
let store = S3CompatibleStore::from_env()?;
let config = S3Config::new("my-bucket")
.with_region("us-west-2")
.with_endpoint("https://s3.amazonaws.com")
.with_credentials("access_key", "secret_key");
let store = S3CompatibleStore::new(config)?;
let adapter = BlobAdapter::new(store, BlobConfig::default());
Features:
- ✅ Native multipart uploads
- ✅ Signed URL generation
- ✅ Range request support
- ✅ Server-side encryption
- ✅ Cross-region replication
Memory Store (Testing & Development)
Perfect for unit tests and development:
use dog_blob::store::MemoryBlobStore;
let store = MemoryBlobStore::new();
let adapter = BlobAdapter::new(store, BlobConfig::default());
Features:
- ✅ Zero configuration
- ✅ Fast in-memory operations
- ✅ Automatic cleanup
- ❌ Not persistent (data lost on restart)
Custom Storage Implementation
Implement BlobStore for your own storage backend:
use dog_blob::{BlobStore, BlobResult, BlobCtx, BlobId, ByteStream};
use async_trait::async_trait;
pub struct MyCustomStore {
}
#[async_trait]
impl BlobStore for MyCustomStore {
async fn put(&self, ctx: BlobCtx, id: BlobId, stream: ByteStream) -> BlobResult<PutResult> {
todo!()
}
async fn get(&self, ctx: BlobCtx, id: BlobId, range: Option<ByteRange>) -> BlobResult<GetResult> {
todo!()
}
}
Advanced Features
Multipart/Resumable Uploads
dog-blob handles multipart uploads automatically for large files:
use dog_blob::prelude::*;
let config = BlobConfig::new()
.with_multipart_threshold(16 * 1024 * 1024) .with_upload_rules(
UploadRules::new()
.with_part_size(8 * 1024 * 1024) .with_max_parts(10_000) );
let adapter = BlobAdapter::new(store, config);
let large_file = std::fs::File::open("video.mp4")?;
let stream = ReaderStream::new(large_file);
let put_request = BlobPut::new()
.with_content_type("video/mp4")
.with_filename("my-video.mp4");
let receipt = adapter.put(ctx, put_request, Box::pin(stream)).await?;
Manual multipart control:
let session = adapter.begin_multipart(ctx.clone(), put_request).await?;
let part1 = adapter.upload_part(ctx.clone(), session.upload_id, 1, stream1).await?;
let part2 = adapter.upload_part(ctx.clone(), session.upload_id, 2, stream2).await?;
let receipt = adapter.complete_multipart(ctx, session.upload_id).await?;
The coordinator automatically handles:
- ✅ Native multipart (when storage supports it)
- ✅ Staged assembly (universal fallback)
- ✅ Part validation and ordering
- ✅ Cleanup on abort/failure
- ✅ Resume from partial uploads
Range Requests (Video Streaming)
Perfect for video/audio scrubbing and progressive downloads:
use dog_blob::{ByteRange, OpenedContent};
let range = ByteRange::new(1024, Some(2048));
let opened = adapter.open(ctx, blob_id, Some(range)).await?;
match opened.content {
OpenedContent::Stream { stream, resolved_range } => {
println!("Streaming bytes {}-{}",
resolved_range.start,
resolved_range.end.unwrap_or(opened.content_length()));
while let Some(chunk) = stream.next().await {
let bytes = chunk?;
}
}
OpenedContent::SignedUrl { url, expires_at } => {
println!("Redirect to: {} (expires: {:?})", url, expires_at);
}
}
Common use cases:
- 🎥 Video seeking/scrubbing
- 🎵 Audio preview generation
- 📄 PDF page extraction
- 🖼️ Image thumbnail generation
- 📊 Large file sampling
Metadata and Custom Fields
Store rich metadata alongside your blobs:
use serde_json::json;
let put_request = BlobPut::new()
.with_content_type("video/mp4")
.with_filename("presentation.mp4")
.with_metadata(json!({
"title": "Q4 Sales Presentation",
"duration_seconds": 1800,
"resolution": "1920x1080",
"tags": ["sales", "quarterly", "presentation"],
"created_by": "user-123",
"department": "sales"
}));
let receipt = adapter.put(ctx, put_request, stream).await?;
let info = adapter.info(ctx, receipt.id).await?;
println!("Video title: {}", info.metadata.custom["title"]);
Configuration
let config = BlobConfig::new()
.with_max_blob_bytes(5 * 1024 * 1024 * 1024) .with_multipart_threshold(16 * 1024 * 1024) .with_upload_rules(
UploadRules::new()
.with_part_size(8 * 1024 * 1024) .with_max_parts(10_000) );
Comparison to feathers-blob
| Feature |
feathers-blob |
dog-blob |
| Input model |
Buffer/data-URI |
Streaming ByteStream |
| Large files |
Manual |
First-class |
| Multipart |
External middleware |
Built-in coordination |
| Resumable |
Not native |
Native |
| Range reads |
App-implemented |
First-class API |
| Storage abstraction |
abstract-blob-store |
Pluggable traits |
| Metadata storage |
Service-specific |
Portable receipts |
| Server coupling |
HTTP-centric |
Server-agnostic |
Design Philosophy
- Media is infrastructure, not a service - BlobAdapter is a capability you embed
- Streaming everywhere - Never buffer entire files in memory
- Storage agnostic - Switch backends without changing service code
- Server agnostic - Works with HTTP, gRPC, CLI, background jobs
- Zero opinions - You decide metadata schema, ACL, business logic
Real-World Examples
Video Streaming Service
use dog_blob::prelude::*;
use serde_json::json;
pub struct VideoService {
blobs: BlobAdapter,
}
impl VideoService {
pub async fn upload_video(&self, user_id: String, video_stream: ByteStream) -> BlobResult<String> {
let ctx = BlobCtx::new("video-app".to_string())
.with_actor(user_id.clone());
let put_request = BlobPut::new()
.with_content_type("video/mp4")
.with_metadata(json!({
"uploaded_by": user_id,
"upload_time": chrono::Utc::now(),
"processing_status": "pending"
}));
let receipt = self.blobs.put(ctx, put_request, video_stream).await?;
Ok(receipt.id.to_string())
}
pub async fn stream_video_segment(&self, video_id: String, start_byte: u64, end_byte: Option<u64>) -> BlobResult<ByteStream> {
let ctx = BlobCtx::new("video-app".to_string());
let blob_id = BlobId(video_id);
let range = ByteRange::new(start_byte, end_byte);
let opened = self.blobs.open(ctx, blob_id, Some(range)).await?;
match opened.content {
OpenedContent::Stream { stream, .. } => Ok(stream),
OpenedContent::SignedUrl { url, .. } => {
Err(BlobError::RedirectToUrl(url))
}
}
}
}
Document Management System
pub struct DocumentService {
blobs: BlobAdapter,
}
impl DocumentService {
pub async fn store_document(&self, tenant_id: String, doc: Document) -> BlobResult<DocumentReceipt> {
let ctx = BlobCtx::new(tenant_id.clone())
.with_actor(doc.created_by.clone());
let put_request = BlobPut::new()
.with_content_type(&doc.mime_type)
.with_filename(&doc.filename)
.with_metadata(json!({
"document_type": doc.doc_type,
"tags": doc.tags,
"department": doc.department,
"confidentiality": doc.confidentiality_level,
"retention_policy": doc.retention_days
}));
let receipt = self.blobs.put(ctx, put_request, doc.content_stream).await?;
Ok(DocumentReceipt {
id: receipt.id.to_string(),
size: receipt.size_bytes,
checksum: receipt.etag,
storage_class: receipt.storage_class,
})
}
pub async fn generate_preview(&self, doc_id: String, page: u32) -> BlobResult<ByteStream> {
let preview_range = ByteRange::new(0, Some(64 * 1024));
let ctx = BlobCtx::new("doc-service".to_string());
let opened = self.blobs.open(ctx, BlobId(doc_id), Some(preview_range)).await?;
match opened.content {
OpenedContent::Stream { stream, .. } => {
Ok(stream)
}
_ => Err(BlobError::UnsupportedOperation("Preview generation requires streaming".into()))
}
}
}
Backup Service with Chunked Uploads
pub struct BackupService {
blobs: BlobAdapter,
}
impl BackupService {
pub async fn backup_large_database(&self, db_path: &str) -> BlobResult<String> {
let ctx = BlobCtx::new("backup-service".to_string());
let config = BlobConfig::new()
.with_multipart_threshold(100 * 1024 * 1024) .with_upload_rules(
UploadRules::new()
.with_part_size(50 * 1024 * 1024) .with_max_parts(2000) );
let put_request = BlobPut::new()
.with_content_type("application/octet-stream")
.with_filename(&format!("backup-{}.sql", chrono::Utc::now().format("%Y%m%d")))
.with_metadata(json!({
"backup_type": "full",
"database": "production",
"compression": "gzip",
"encrypted": true
}));
let file = tokio::fs::File::open(db_path).await?;
let stream = ReaderStream::new(file);
let receipt = self.blobs.put(ctx, put_request, Box::pin(stream)).await?;
Ok(receipt.id.to_string())
}
}
API Reference
Core Types
BlobAdapter
The main interface for blob operations.
impl BlobAdapter {
pub async fn put(&self, ctx: BlobCtx, request: BlobPut, stream: ByteStream) -> BlobResult<BlobReceipt>;
pub async fn open(&self, ctx: BlobCtx, id: BlobId, range: Option<ByteRange>) -> BlobResult<OpenedBlob>;
pub async fn info(&self, ctx: BlobCtx, id: BlobId) -> BlobResult<BlobInfo>;
pub async fn delete(&self, ctx: BlobCtx, id: BlobId) -> BlobResult<()>;
pub async fn begin_multipart(&self, ctx: BlobCtx, request: BlobPut) -> BlobResult<UploadSession>;
pub async fn upload_part(&self, ctx: BlobCtx, upload_id: UploadId, part_number: u32, stream: ByteStream) -> BlobResult<PartReceipt>;
pub async fn complete_multipart(&self, ctx: BlobCtx, upload_id: UploadId) -> BlobResult<BlobReceipt>;
pub async fn abort_multipart(&self, ctx: BlobCtx, upload_id: UploadId) -> BlobResult<()>;
}
BlobCtx
Context for multi-tenant operations.
impl BlobCtx {
pub fn new(tenant_id: String) -> Self;
pub fn with_actor(self, actor_id: String) -> Self;
pub fn with_trace_id(self, trace_id: String) -> Self;
}
BlobPut
Request builder for blob uploads.
impl BlobPut {
pub fn new() -> Self;
pub fn with_content_type(self, content_type: &str) -> Self;
pub fn with_filename(self, filename: &str) -> Self;
pub fn with_metadata(self, metadata: serde_json::Value) -> Self;
pub fn with_storage_class(self, class: &str) -> Self;
}
Performance & Best Practices
Streaming Best Practices
- Always stream large files - Never load entire files into memory
- Use appropriate buffer sizes - 8MB chunks work well for most cases
- Enable multipart for large files - Set threshold around 16-100MB
- Implement backpressure - Don't overwhelm slower storage backends
let file = tokio::fs::File::open("large-video.mp4").await?;
let stream = ReaderStream::new(file);
let receipt = adapter.put(ctx, request, Box::pin(stream)).await?;
let data = tokio::fs::read("large-video.mp4").await?;
let stream = stream::once(async { Ok(Bytes::from(data)) });
let receipt = adapter.put(ctx, request, Box::pin(stream)).await?;
Configuration Tuning
let config = BlobConfig::new()
.with_multipart_threshold(50 * 1024 * 1024) .with_upload_rules(
UploadRules::new()
.with_part_size(25 * 1024 * 1024) .with_max_parts(4000) );
let config = BlobConfig::new()
.with_multipart_threshold(10 * 1024 * 1024) .with_upload_rules(
UploadRules::new()
.with_part_size(5 * 1024 * 1024) .with_max_parts(1000) );
Error Handling
use dog_blob::{BlobError, BlobResult};
match adapter.put(ctx, request, stream).await {
Ok(receipt) => {
println!("Upload successful: {}", receipt.id);
}
Err(BlobError::StorageQuotaExceeded) => {
return Err("Storage quota exceeded".into());
}
Err(BlobError::InvalidContentType(ct)) => {
return Err(format!("Invalid content type: {}", ct).into());
}
Err(BlobError::NetworkError(e)) => {
tokio::time::sleep(Duration::from_secs(1)).await;
}
Err(e) => {
eprintln!("Upload failed: {}", e);
}
}
Monitoring and Observability
use tracing::{info, error, instrument};
impl VideoService {
#[instrument(skip(self, video_stream))]
pub async fn upload_video(&self, user_id: String, video_stream: ByteStream) -> BlobResult<String> {
info!("Starting video upload for user: {}", user_id);
let start = std::time::Instant::now();
let result = self.blobs.put(ctx, request, video_stream).await;
match &result {
Ok(receipt) => {
info!(
"Video upload completed in {:?}, size: {} bytes, id: {}",
start.elapsed(),
receipt.size_bytes,
receipt.id
);
}
Err(e) => {
error!("Video upload failed after {:?}: {}", start.elapsed(), e);
}
}
result.map(|r| r.id.to_string())
}
}
Examples
The dog-examples/ directory contains real-world implementations:
music-blobs/ - Complete music streaming application with waveform visualization
- Demonstrates S3-compatible storage integration
- Shows multipart upload handling for large audio files
- Implements range requests for audio scrubbing
- Features real-time waveform generation using Symphonia audio decoding
- Production-ready service architecture with dog-blob integration
Roadmap
License
MIT OR Apache-2.0
Made by Jitpomi