Storage

Trait Storage 

Source
pub trait Storage: Send + Sync {
Show 27 methods // Required methods fn create_file<'life0, 'async_trait>( &'life0 self, name: String, description: Option<String>, templates: Vec<RequestTemplateInput>, ) -> Pin<Box<dyn Future<Output = Result<FileId>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn create_file_stream<'life0, 'async_trait, S>( &'life0 self, stream: S, ) -> Pin<Box<dyn Future<Output = Result<FileId>> + Send + 'async_trait>> where S: 'async_trait + Stream<Item = FileStreamItem> + Send + Unpin, Self: 'async_trait, 'life0: 'async_trait; fn get_file<'life0, 'async_trait>( &'life0 self, file_id: FileId, ) -> Pin<Box<dyn Future<Output = Result<File>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn list_files<'life0, 'async_trait>( &'life0 self, filter: FileFilter, ) -> Pin<Box<dyn Future<Output = Result<Vec<File>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn get_file_content<'life0, 'async_trait>( &'life0 self, file_id: FileId, ) -> Pin<Box<dyn Future<Output = Result<Vec<FileContentItem>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn get_file_content_stream( &self, file_id: FileId, offset: usize, search: Option<String>, ) -> Pin<Box<dyn Stream<Item = Result<FileContentItem>> + Send>>; fn get_file_template_stats<'life0, 'async_trait>( &'life0 self, file_id: FileId, ) -> Pin<Box<dyn Future<Output = Result<Vec<ModelTemplateStats>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn delete_file<'life0, 'async_trait>( &'life0 self, file_id: FileId, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn create_batch<'life0, 'async_trait>( &'life0 self, input: BatchInput, ) -> Pin<Box<dyn Future<Output = Result<Batch>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn get_batch<'life0, 'async_trait>( &'life0 self, batch_id: BatchId, ) -> Pin<Box<dyn Future<Output = Result<Batch>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn get_batch_status<'life0, 'async_trait>( &'life0 self, batch_id: BatchId, ) -> Pin<Box<dyn Future<Output = Result<BatchStatus>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn list_file_batches<'life0, 'async_trait>( &'life0 self, file_id: FileId, ) -> Pin<Box<dyn Future<Output = Result<Vec<BatchStatus>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn list_batches<'life0, 'async_trait>( &'life0 self, created_by: Option<String>, search: Option<String>, after: Option<BatchId>, limit: i64, ) -> Pin<Box<dyn Future<Output = Result<Vec<Batch>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn get_batch_by_output_file_id<'life0, 'async_trait>( &'life0 self, file_id: FileId, file_type: OutputFileType, ) -> Pin<Box<dyn Future<Output = Result<Option<Batch>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn get_batch_requests<'life0, 'async_trait>( &'life0 self, batch_id: BatchId, ) -> Pin<Box<dyn Future<Output = Result<Vec<AnyRequest>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn get_batch_results_stream( &self, batch_id: BatchId, offset: usize, search: Option<String>, status: Option<String>, ) -> Pin<Box<dyn Stream<Item = Result<BatchResultItem>> + Send>>; fn find_pending_escalation<'life0, 'async_trait>( &'life0 self, original_request_id: RequestId, ) -> Pin<Box<dyn Future<Output = Result<Option<RequestId>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn get_at_risk_batches<'life0, 'life1, 'async_trait>( &'life0 self, threshold_seconds: i64, allowed_states: &'life1 [RequestStateFilter], ) -> Pin<Box<dyn Future<Output = Result<HashMap<BatchId, usize>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn get_missed_sla_batches<'life0, 'life1, 'async_trait>( &'life0 self, allowed_states: &'life1 [RequestStateFilter], ) -> Pin<Box<dyn Future<Output = Result<HashMap<BatchId, usize>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn create_escalated_requests<'life0, 'life1, 'life2, 'life3, 'life4, 'async_trait>( &'life0 self, model: &'life1 str, threshold_seconds: i64, allowed_states: &'life2 [RequestStateFilter], model_override: Option<&'life3 str>, api_key_override: Option<&'life4 str>, ) -> Pin<Box<dyn Future<Output = Result<i64>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait, 'life4: 'async_trait; fn cancel_batch<'life0, 'async_trait>( &'life0 self, batch_id: BatchId, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn delete_batch<'life0, 'async_trait>( &'life0 self, batch_id: BatchId, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn retry_failed_requests<'life0, 'async_trait>( &'life0 self, ids: Vec<RequestId>, ) -> Pin<Box<dyn Future<Output = Result<Vec<Result<()>>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn get_requests<'life0, 'async_trait>( &'life0 self, ids: Vec<RequestId>, ) -> Pin<Box<dyn Future<Output = Result<Vec<Result<AnyRequest>>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn claim_requests<'life0, 'async_trait>( &'life0 self, limit: usize, daemon_id: DaemonId, ) -> Pin<Box<dyn Future<Output = Result<Vec<Request<Claimed>>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn persist<'life0, 'life1, 'async_trait, T>( &'life0 self, request: &'life1 Request<T>, ) -> Pin<Box<dyn Future<Output = Result<Option<RequestId>>> + Send + 'async_trait>> where AnyRequest: From<Request<T>>, T: 'async_trait + RequestState + Clone, Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; // Provided method fn cancel_requests<'life0, 'async_trait>( &'life0 self, ids: Vec<RequestId>, ) -> Pin<Box<dyn Future<Output = Result<Vec<Result<()>>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... }
}
Expand description

Storage trait for persisting and querying requests.

This trait provides atomic operations for request lifecycle management. The type system ensures valid state transitions, so implementations don’t need to validate them.

Required Methods§

Source

fn create_file<'life0, 'async_trait>( &'life0 self, name: String, description: Option<String>, templates: Vec<RequestTemplateInput>, ) -> Pin<Box<dyn Future<Output = Result<FileId>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Create a new file with templates.

Source

fn create_file_stream<'life0, 'async_trait, S>( &'life0 self, stream: S, ) -> Pin<Box<dyn Future<Output = Result<FileId>> + Send + 'async_trait>>
where S: 'async_trait + Stream<Item = FileStreamItem> + Send + Unpin, Self: 'async_trait, 'life0: 'async_trait,

Create a new file with templates from a stream.

The stream yields FileStreamItem which can be either:

  • Metadata: File metadata (can appear anywhere, will be accumulated)
  • Template: Request templates (processed as they arrive)
Source

fn get_file<'life0, 'async_trait>( &'life0 self, file_id: FileId, ) -> Pin<Box<dyn Future<Output = Result<File>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get a file by ID.

Source

fn list_files<'life0, 'async_trait>( &'life0 self, filter: FileFilter, ) -> Pin<Box<dyn Future<Output = Result<Vec<File>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

List files with optional filtering.

Source

fn get_file_content<'life0, 'async_trait>( &'life0 self, file_id: FileId, ) -> Pin<Box<dyn Future<Output = Result<Vec<FileContentItem>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get all content for a file.

Source

fn get_file_content_stream( &self, file_id: FileId, offset: usize, search: Option<String>, ) -> Pin<Box<dyn Stream<Item = Result<FileContentItem>> + Send>>

Stream file content. Returns different content types based on the file’s purpose:

  • Regular files (purpose=‘batch’): RequestTemplateInput
  • Batch output files (purpose=‘batch_output’): BatchOutputItem
  • Batch error files (purpose=‘batch_error’): BatchErrorItem

The offset parameter allows skipping the first N lines (0-indexed). The search parameter filters results by custom_id (case-insensitive substring match).

Source

fn get_file_template_stats<'life0, 'async_trait>( &'life0 self, file_id: FileId, ) -> Pin<Box<dyn Future<Output = Result<Vec<ModelTemplateStats>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get aggregated statistics for request templates grouped by model. This is optimized for cost estimation - it only fetches model names and body sizes, avoiding the overhead of streaming full template data.

Returns a vector of per-model statistics including request count and total body bytes.

Source

fn delete_file<'life0, 'async_trait>( &'life0 self, file_id: FileId, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Delete a file (cascades to batches and executions).

Source

fn create_batch<'life0, 'async_trait>( &'life0 self, input: BatchInput, ) -> Pin<Box<dyn Future<Output = Result<Batch>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Create a batch from a file’s current templates. This will spawn requests in the Pending state for all templates in the file.

Source

fn get_batch<'life0, 'async_trait>( &'life0 self, batch_id: BatchId, ) -> Pin<Box<dyn Future<Output = Result<Batch>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get a batch by ID.

Source

fn get_batch_status<'life0, 'async_trait>( &'life0 self, batch_id: BatchId, ) -> Pin<Box<dyn Future<Output = Result<BatchStatus>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get batch status.

Source

fn list_file_batches<'life0, 'async_trait>( &'life0 self, file_id: FileId, ) -> Pin<Box<dyn Future<Output = Result<Vec<BatchStatus>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

List all batches for a file.

Source

fn list_batches<'life0, 'async_trait>( &'life0 self, created_by: Option<String>, search: Option<String>, after: Option<BatchId>, limit: i64, ) -> Pin<Box<dyn Future<Output = Result<Vec<Batch>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

List batches with optional filtering by creator and cursor-based pagination. Returns batches sorted by created_at DESC. The after parameter is a cursor for pagination (returns batches created before this ID).

Source

fn get_batch_by_output_file_id<'life0, 'async_trait>( &'life0 self, file_id: FileId, file_type: OutputFileType, ) -> Pin<Box<dyn Future<Output = Result<Option<Batch>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get a batch by its output or error file ID.

Source

fn get_batch_requests<'life0, 'async_trait>( &'life0 self, batch_id: BatchId, ) -> Pin<Box<dyn Future<Output = Result<Vec<AnyRequest>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get all requests for a batch.

Source

fn get_batch_results_stream( &self, batch_id: BatchId, offset: usize, search: Option<String>, status: Option<String>, ) -> Pin<Box<dyn Stream<Item = Result<BatchResultItem>> + Send>>

Stream batch results with merged input/output data.

Returns a stream of BatchResultItem, each containing:

  • The original input body from the request template
  • The response body (for completed requests)
  • The error message (for failed requests)
  • The current status

Results are filtered to exclude superseded requests (those that lost the race to their escalated pair). This ensures exactly one result per input template.

§Arguments
  • batch_id: The batch to get results for
  • offset: Number of results to skip (for pagination)
  • search: Optional custom_id filter (case-insensitive substring match)
  • status: Optional status filter (completed, failed, pending, in_progress)
Source

fn find_pending_escalation<'life0, 'async_trait>( &'life0 self, original_request_id: RequestId, ) -> Pin<Box<dyn Future<Output = Result<Option<RequestId>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Find a pending escalated request for a given original request.

When a request is escalated, both the original and escalated request race. This method finds the escalated request (if any) that was created from the given original request ID and is still in a pending/claimed/processing state.

This is much more efficient than get_batch_requests for finding a single escalated request, as it uses a targeted query instead of fetching all requests in the batch.

§Returns

The request ID of the pending escalated request, or None if not found.

Source

fn get_at_risk_batches<'life0, 'life1, 'async_trait>( &'life0 self, threshold_seconds: i64, allowed_states: &'life1 [RequestStateFilter], ) -> Pin<Box<dyn Future<Output = Result<HashMap<BatchId, usize>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Get batches at risk of missing their SLA deadline.

Returns a map of batch IDs to the count of requests in that batch that are at risk of missing the SLA. Useful for logging SLA violations.

§Arguments
  • threshold_seconds: Time remaining threshold (e.g., 3600 for 1 hour)
  • allowed_states: List of request states to count
§Returns

HashMap mapping BatchId to the count of at-risk requests

Source

fn get_missed_sla_batches<'life0, 'life1, 'async_trait>( &'life0 self, allowed_states: &'life1 [RequestStateFilter], ) -> Pin<Box<dyn Future<Output = Result<HashMap<BatchId, usize>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Get batches with requests that have already missed their SLA deadline.

Returns a count of requests per batch where the deadline has passed. Only considers batches that are not completed, failed, or cancelled.

§Arguments
  • allowed_states: List of request states to count
§Returns

HashMap mapping BatchId to the count of requests that missed SLA

Source

fn create_escalated_requests<'life0, 'life1, 'life2, 'life3, 'life4, 'async_trait>( &'life0 self, model: &'life1 str, threshold_seconds: i64, allowed_states: &'life2 [RequestStateFilter], model_override: Option<&'life3 str>, api_key_override: Option<&'life4 str>, ) -> Pin<Box<dyn Future<Output = Result<i64>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait, 'life4: 'async_trait,

Create escalated requests for at-risk requests in a single operation.

Creates escalated copies of all requests matching the criteria. Automatically skips requests that already have escalations.

Escalated requests:

  • Have the same template/body as the original (from request_templates)
  • Use the overridden model if provided, otherwise same model as original
  • Are marked with is_escalated = true (invisible to batch accounting)
  • Link back to original via escalated_from_request_id
  • Priority endpoint routing is handled by the daemon at request processing time

Both requests race through normal queue processing. First to complete wins.

§Arguments
  • model: The model to filter requests by (e.g., “gpt-4”)
  • threshold_seconds: Seconds since batch creation to consider at-risk
  • allowed_states: List of request states to escalate
  • model_override: Optional model name to use for escalated requests (e.g., “gpt-4-priority”)
  • api_key_override: Optional API key to use for escalated requests
§Returns

The number of escalated requests created

Source

fn cancel_batch<'life0, 'async_trait>( &'life0 self, batch_id: BatchId, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Cancel all pending/in-progress requests for a batch.

Source

fn delete_batch<'life0, 'async_trait>( &'life0 self, batch_id: BatchId, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Delete a batch and all its associated requests. This is a destructive operation that removes the batch and all request data.

Source

fn retry_failed_requests<'life0, 'async_trait>( &'life0 self, ids: Vec<RequestId>, ) -> Pin<Box<dyn Future<Output = Result<Vec<Result<()>>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Retry failed requests by resetting them to pending state.

This resets the specified failed requests to pending state with retry_attempt = 0, allowing them to be picked up by the daemon for reprocessing.

§Arguments
  • ids - Request IDs to retry
§Returns

A vector of results, one for each request ID. Each result indicates whether the retry succeeded or failed.

§Errors

Individual retry results may fail if:

  • Request ID doesn’t exist
  • Request is not in failed state
Source

fn get_requests<'life0, 'async_trait>( &'life0 self, ids: Vec<RequestId>, ) -> Pin<Box<dyn Future<Output = Result<Vec<Result<AnyRequest>>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get in progress requests by IDs.

Source

fn claim_requests<'life0, 'async_trait>( &'life0 self, limit: usize, daemon_id: DaemonId, ) -> Pin<Box<dyn Future<Output = Result<Vec<Request<Claimed>>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Atomically claim pending requests for processing.

Source

fn persist<'life0, 'life1, 'async_trait, T>( &'life0 self, request: &'life1 Request<T>, ) -> Pin<Box<dyn Future<Output = Result<Option<RequestId>>> + Send + 'async_trait>>
where AnyRequest: From<Request<T>>, T: 'async_trait + RequestState + Clone, Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Update an existing request’s state in storage.

Returns Some(request_id) if a racing pair was superseded (for cancellation purposes).

Provided Methods§

Source

fn cancel_requests<'life0, 'async_trait>( &'life0 self, ids: Vec<RequestId>, ) -> Pin<Box<dyn Future<Output = Result<Vec<Result<()>>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

The following methods are defined specifically for requests - i.e. independent of the files/batches they belong to.

Cancel one or more individual pending or in-progress requests.

Requests that have already completed or failed cannot be canceled. This is a best-effort operation - some requests may have already been processed.

Returns a result for each request ID indicating whether cancellation succeeded.

§Errors

Individual cancellation results may fail if:

  • Request ID doesn’t exist
  • Request is already in a terminal state (completed/failed)

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§