fusillade 15.0.0

A daemon implementation for sending batched LLM requests with efficient request coalescing
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
//! Main traits for the batching system.
//!
//! This module defines the `Storage` and `RequestManager` traits, which provide the interface
//! for persisting requests, creating files, launching batches, and checking execution status.

use crate::batch::{
    Batch, BatchId, BatchInput, BatchStatus, File, FileContentItem, FileFilter, FileId,
    FileStreamItem, FileStreamResult, ListBatchesFilter, OutputFileType, RequestTemplateInput,
};
use crate::daemon::{AnyDaemonRecord, DaemonRecord, DaemonState, DaemonStatus};
use crate::error::Result;
use crate::http::HttpClient;
use crate::request::{AnyRequest, Claimed, DaemonId, Request, RequestId, RequestState};
use async_trait::async_trait;
use futures::stream::Stream;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use tokio::task::JoinHandle;

#[cfg(feature = "postgres")]
pub mod postgres;
mod utils;

/// Storage trait for persisting and querying requests.
///
/// This trait provides atomic operations for request lifecycle management.
/// The type system ensures valid state transitions, so implementations don't
/// need to validate them.
#[async_trait]
pub trait Storage: Send + Sync {
    /// Create a new file with templates.
    async fn create_file(
        &self,
        name: String,
        description: Option<String>,
        templates: Vec<RequestTemplateInput>,
    ) -> Result<FileId>;

    /// Create a new file with templates from a stream.
    ///
    /// The stream yields FileStreamItem which can be either:
    /// - Metadata: File metadata (can appear anywhere, will be accumulated)
    /// - Template: Request templates (processed as they arrive)
    /// - Abort: Producer initiated rollback without treating it as a fusillade error
    async fn create_file_stream<S: Stream<Item = FileStreamItem> + Send + Unpin>(
        &self,
        stream: S,
    ) -> Result<FileStreamResult>;

    /// Get a file by ID.
    async fn get_file(&self, file_id: FileId) -> Result<File>;

    /// Get a file by ID from the primary pool for read-after-write consistency.
    ///
    /// Use this immediately after creating or modifying a file to ensure you read
    /// the latest committed data. For normal reads, use `get_file()` which may use
    /// read replicas for better performance.
    async fn get_file_from_primary_pool(&self, file_id: FileId) -> Result<File>;

    /// List files with optional filtering.
    async fn list_files(&self, filter: FileFilter) -> Result<Vec<File>>;

    /// Get all content for a file.
    async fn get_file_content(&self, file_id: FileId) -> Result<Vec<FileContentItem>>;

    /// Stream file content.
    /// Returns different content types based on the file's purpose:
    /// - Regular files (purpose='batch'): RequestTemplateInput
    /// - Batch output files (purpose='batch_output'): BatchOutputItem
    /// - Batch error files (purpose='batch_error'): BatchErrorItem
    ///
    /// # Arguments
    /// * `file_id` - The file ID to stream content from
    /// * `offset` - Number of lines to skip (0-indexed)
    /// * `search` - Optional filter by custom_id (case-insensitive substring match)
    fn get_file_content_stream(
        &self,
        file_id: FileId,
        offset: usize,
        search: Option<String>,
    ) -> Pin<Box<dyn Stream<Item = Result<FileContentItem>> + Send>>;

    /// Get aggregated statistics for request templates grouped by model.
    /// This is optimized for cost estimation - it only fetches model names and body sizes,
    /// avoiding the overhead of streaming full template data.
    ///
    /// Returns a vector of per-model statistics including request count and total body bytes.
    async fn get_file_template_stats(
        &self,
        file_id: FileId,
    ) -> Result<Vec<crate::batch::ModelTemplateStats>>;

    /// Delete a file (cascades to batches and executions).
    async fn delete_file(&self, file_id: FileId) -> Result<()>;

    /// Create a batch from a file's current templates.
    ///
    /// Convenience method that calls [`create_batch_record`] to insert the batch
    /// row, then [`populate_batch`] to copy templates into requests. Returns the
    /// fully-populated batch.
    async fn create_batch(&self, input: BatchInput) -> Result<Batch>;

    /// Create a batch record with virtual output/error files, without populating requests.
    ///
    /// Inserts the batch row and creates virtual output/error files so their IDs
    /// are available in the API response immediately.
    /// Returns a batch in `"validating"` status (`requests_started_at` is NULL).
    /// `total_requests` will be set from `input.total_requests` if provided, or `0` otherwise.
    /// Use [`populate_batch`] to copy templates into requests afterward.
    async fn create_batch_record(&self, input: BatchInput) -> Result<Batch>;

    /// Populate an existing batch with requests from its file's templates.
    ///
    /// Copies templates into the requests table and updates the batch with
    /// total_requests and requests_started_at.
    /// If the file has no templates, returns a [`ValidationError`](crate::FusilladeError::ValidationError)
    /// and the caller is responsible for marking the batch as failed.
    async fn populate_batch(&self, batch_id: BatchId, file_id: FileId) -> Result<()>;

    /// Get a batch by ID.
    ///
    /// # Arguments
    /// * `batch_id` - The batch ID to retrieve
    async fn get_batch(&self, batch_id: BatchId) -> Result<Batch>;

    /// Get batch status.
    ///
    /// # Arguments
    /// * `batch_id` - The batch ID to retrieve status for
    async fn get_batch_status(&self, batch_id: BatchId) -> Result<BatchStatus>;

    /// List all batches for a file.
    ///
    /// # Arguments
    /// * `file_id` - The file ID to list batches for
    async fn list_file_batches(&self, file_id: FileId) -> Result<Vec<BatchStatus>>;

    /// List batches with optional filtering and cursor-based pagination.
    /// Returns batches sorted by created_at DESC (or active-first when `active_first` is set).
    ///
    /// See [`ListBatchesFilter`] for available filter options including:
    /// - `created_by` - Filter by batch creator user ID
    /// - `search` - Case-insensitive substring match against metadata JSON text,
    ///   input filename, or batch ID
    /// - `after` / `limit` - Cursor-based pagination (limit defaults to 100 if not set)
    /// - `api_key_ids` - Filter by API key UUID(s) that created the batch (for per-member attribution)
    /// - `status` - Filter by batch status. Supported values:
    ///   `"in_progress"`, `"completed"`, `"failed"`, `"cancelled"`, `"expired"`.
    ///   `"in_progress"` covers all non-terminal batches (including validating and finalizing
    ///   sub-states). `"cancelled"` includes batches that are still cancelling.
    ///   `"expired"` matches batches with SLA issues: in-progress past their deadline,
    ///   or terminal batches that finished after their deadline.
    ///   Unrecognized values return an error.
    /// - `created_after` / `created_before` - Time range filter on batch creation timestamp
    /// - `active_first` - When true, sorts active batches before terminal ones
    ///   (completed, failed, cancelled, or cancelling), with each group sorted by
    ///   created_at DESC. Cancelling batches are terminal because cancel_batch sets
    ///   both timestamps atomically. Cursor pagination respects this ordering.
    async fn list_batches(&self, filter: ListBatchesFilter) -> Result<Vec<Batch>>;

    /// Get a batch by its output or error file ID.
    async fn get_batch_by_output_file_id(
        &self,
        file_id: FileId,
        file_type: OutputFileType,
    ) -> Result<Option<Batch>>;

    /// Get all requests for a batch.
    async fn get_batch_requests(&self, batch_id: BatchId) -> Result<Vec<AnyRequest>>;

    /// Stream batch results with merged input/output data.
    ///
    /// Returns a stream of BatchResultItem, each containing:
    /// - The original input body from the request template
    /// - The response body (for completed requests)
    /// - The error message (for failed requests)
    /// - The current status
    ///
    /// # Arguments
    /// * `batch_id` - The batch to get results for
    /// * `offset` - Number of results to skip (for pagination)
    /// * `search` - Optional custom_id filter (case-insensitive substring match)
    /// * `status` - Optional status filter (completed, failed, pending, in_progress)
    fn get_batch_results_stream(
        &self,
        batch_id: BatchId,
        offset: usize,
        search: Option<String>,
        status: Option<String>,
    ) -> Pin<Box<dyn Stream<Item = Result<crate::batch::BatchResultItem>> + Send>>;

    /// Given a list of batch IDs, return those that have been cancelled (cancelling_at IS NOT NULL).
    async fn get_cancelled_batch_ids(&self, batch_ids: &[BatchId]) -> Result<Vec<BatchId>>;

    /// Cancel all pending/in-progress requests for a batch.
    async fn cancel_batch(&self, batch_id: BatchId) -> Result<()>;

    /// Delete a batch and all its associated requests.
    /// This is a destructive operation that removes the batch and all request data.
    async fn delete_batch(&self, batch_id: BatchId) -> Result<()>;

    /// Retry failed requests by resetting them to pending state.
    ///
    /// This resets the specified failed requests to pending state with retry_attempt = 0,
    /// allowing them to be picked up by the daemon for reprocessing.
    ///
    /// # Arguments
    /// * `ids` - Request IDs to retry
    ///
    /// # Returns
    /// A vector of results, one for each request ID. Each result indicates whether
    /// the retry succeeded or failed.
    ///
    /// # Errors
    /// Individual retry results may fail if:
    /// - Request ID doesn't exist
    /// - Request is not in failed state
    async fn retry_failed_requests(&self, ids: Vec<RequestId>) -> Result<Vec<Result<()>>>;

    /// Retry all failed requests for a batch in a single database operation.
    ///
    /// This is much more efficient than `retry_failed_requests` when retrying all failed
    /// requests for a batch, as it performs a single UPDATE query instead of loading
    /// all requests into memory.
    ///
    /// # Returns
    /// The number of requests that were retried.
    async fn retry_failed_requests_for_batch(&self, batch_id: BatchId) -> Result<u64>;

    /// Get request counts grouped by model and completion window.
    ///
    /// - `windows`: Vec of (label, seconds)
    /// - `states`: request states to include (e.g. ["pending"], or ["pending","claimed","processing"])
    /// - `model_filter`: optional model whitelist (empty = all)
    /// - `strict`: bool. For critical/sensitive operations, set 'true' to use the write pool and avoid read lags.
    ///
    /// This excludes:
    /// - Requests without a template_id
    /// - Requests in batches being cancelled
    async fn get_pending_request_counts_by_model_and_completion_window(
        &self,
        windows: &[(String, i64)],
        states: &[String],
        model_filter: &[String],
        strict: bool,
    ) -> Result<HashMap<String, HashMap<String, i64>>>;
    ///
    /// Cancel one or more individual pending or in-progress requests.
    ///
    /// Requests that have already completed or failed cannot be canceled.
    /// This is a best-effort operation - some requests may have already been processed.
    ///
    /// Returns a result for each request ID indicating whether cancellation succeeded.
    ///
    /// # Errors
    /// Individual cancellation results may fail if:
    /// - Request ID doesn't exist
    /// - Request is already in a terminal state (completed/failed)
    #[tracing::instrument(skip(self, ids), fields(count = ids.len()))]
    async fn cancel_requests(&self, ids: Vec<RequestId>) -> Result<Vec<Result<()>>> {
        tracing::debug!(count = ids.len(), "Cancelling requests");

        let mut results = Vec::new();

        for id in ids {
            // Get the request from storage
            let get_results = self.get_requests(vec![id]).await?;
            let request_result = get_results.into_iter().next().unwrap();

            let result = match request_result {
                Ok(any_request) => match any_request {
                    AnyRequest::Pending(req) => {
                        req.cancel(self).await?;
                        Ok(())
                    }
                    AnyRequest::Claimed(req) => {
                        req.cancel(self).await?;
                        Ok(())
                    }
                    AnyRequest::Processing(req) => {
                        req.cancel(self).await?;
                        Ok(())
                    }
                    AnyRequest::Completed(_) | AnyRequest::Failed(_) | AnyRequest::Canceled(_) => {
                        Err(crate::error::FusilladeError::InvalidState(
                            id,
                            "terminal state".to_string(),
                            "cancellable state".to_string(),
                        ))
                    }
                },
                Err(e) => Err(e),
            };

            results.push(result);
        }

        Ok(results)
    }

    /// Get in progress requests by IDs.
    async fn get_requests(&self, ids: Vec<RequestId>) -> Result<Vec<Result<AnyRequest>>>;

    // These methods are used by the DaemonExecutor for pulling requests, and then persisting their
    // states as they iterate through them

    /// Atomically claim pending requests for processing.
    ///
    /// `available_capacity` maps model names to the number of permits the daemon
    /// is currently holding for that model. Only models present in this map will
    /// be claimed — this is the authoritative set of models to process.
    ///
    /// `user_active_counts` maps user identifiers to their current number of
    /// in-flight requests across all models. Used to prioritise users with fewer
    /// active requests for per-user fair scheduling. Pass an empty map to disable
    /// user-level prioritisation (falls back to deadline-only ordering).
    ///
    /// Implementations may blend user-fairness with SLA urgency (batch deadline
    /// proximity) via `DaemonConfig::urgency_weight`. See the PostgreSQL
    /// implementation for the composite scoring formula.
    async fn claim_requests(
        &self,
        limit: usize,
        daemon_id: DaemonId,
        available_capacity: &std::collections::HashMap<String, usize>,
        user_active_counts: &std::collections::HashMap<String, usize>,
    ) -> Result<Vec<Request<Claimed>>>;

    /// Update an existing request's state in storage.
    ///
    /// Returns `Some(request_id)` if a racing pair was superseded (for cancellation purposes).
    async fn persist<T: RequestState + Clone>(
        &self,
        request: &Request<T>,
    ) -> Result<Option<RequestId>>
    where
        AnyRequest: From<Request<T>>;
}

/// Daemon lifecycle persistence.
///
/// This trait provides storage operations for tracking daemon state,
/// including registration, heartbeat updates, and graceful shutdown.
#[async_trait]
pub trait DaemonStorage: Send + Sync {
    /// Persist daemon state update.
    ///
    /// This is a low-level method used by state transition methods.
    /// The type parameter `T` ensures type-safe state transitions.
    async fn persist_daemon<T: DaemonState + Clone>(&self, record: &DaemonRecord<T>) -> Result<()>
    where
        AnyDaemonRecord: From<DaemonRecord<T>>;

    /// Get daemon by ID.
    ///
    /// Returns an `AnyDaemonRecord` which can hold the daemon in any state.
    async fn get_daemon(&self, daemon_id: DaemonId) -> Result<AnyDaemonRecord>;

    /// List all daemons with optional status filter.
    ///
    /// If `status_filter` is `None`, returns all daemons regardless of status.
    /// Otherwise, returns only daemons matching the specified status.
    async fn list_daemons(
        &self,
        status_filter: Option<DaemonStatus>,
    ) -> Result<Vec<AnyDaemonRecord>>;

    /// Purge orphaned request_templates and requests whose parent (file or batch)
    /// has been soft-deleted or whose FK is NULL.
    ///
    /// Deletes at most `batch_size` rows from each table per call.
    /// Returns total rows deleted across both tables. Called periodically by
    /// the daemon purge task for right-to-erasure compliance.
    async fn purge_orphaned_rows(&self, batch_size: i64) -> Result<u64>;
}

/// Daemon executor trait for runtime orchestration.
///
/// This trait handles only the daemon lifecycle - spawning the background worker
/// that processes requests. All data operations are on the Storage trait.
#[async_trait]
pub trait DaemonExecutor<H: HttpClient>: Storage + Send + Sync {
    /// Get a reference to the HTTP client.
    fn http_client(&self) -> &Arc<H>;

    /// Get a reference to the daemon configuration.
    fn config(&self) -> &crate::daemon::DaemonConfig;

    /// Run the daemon thread.
    ///
    /// This spawns a background task responsible for actually doing the work of moving
    /// requests from one state to another, and broadcasting those status changes.
    ///
    /// The daemon will:
    /// - Claim pending requests
    /// - Execute HTTP requests
    /// - Handle retries with exponential backoff
    /// - Update request statuses
    /// - Respect per-model concurrency limits
    ///
    /// # Errors
    /// Returns an error if the daemon fails to start.
    ///
    /// # Example
    /// ```ignore
    /// let manager = Arc::new(manager);
    /// let handle = manager.run()?;
    ///
    /// // Do work...
    ///
    /// // Shutdown gracefully (implementation-specific)
    /// handle.abort();
    /// ```
    fn run(
        self: Arc<Self>,
        shutdown_token: tokio_util::sync::CancellationToken,
    ) -> Result<JoinHandle<Result<()>>>;

    // File and Batch Management methods are inherited from the Storage trait
}