fusillade 8.0.0

A daemon implementation for sending batched LLM requests with efficient request coalescing
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
//! File and batch types for grouping requests.
//!
//! This module defines types for:
//! - Files: Collections of request templates
//! - Request templates: Mutable request definitions
//! - Batches: Execution triggers for files
//!
//! See request/ for the types for requests, since they have their logic more tightly coupled to
//! their models.

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::str::FromStr;
use uuid::Uuid;

/// Unique identifier for a file.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct FileId(pub Uuid);

impl From<Uuid> for FileId {
    fn from(uuid: Uuid) -> Self {
        FileId(uuid)
    }
}

impl std::ops::Deref for FileId {
    type Target = Uuid;
    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

impl std::fmt::Display for FileId {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", &self.0.to_string()[..8])
    }
}

/// Unique identifier for a batch.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize)]
#[serde(transparent)]
pub struct BatchId(pub Uuid);

impl From<Uuid> for BatchId {
    fn from(uuid: Uuid) -> Self {
        BatchId(uuid)
    }
}

impl std::ops::Deref for BatchId {
    type Target = Uuid;
    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

impl std::fmt::Display for BatchId {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", self.0)
    }
}

/// Unique identifier for a request template.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize)]
#[serde(transparent)]
pub struct TemplateId(pub Uuid);

impl From<Uuid> for TemplateId {
    fn from(uuid: Uuid) -> Self {
        TemplateId(uuid)
    }
}

impl std::ops::Deref for TemplateId {
    type Target = Uuid;
    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

impl std::fmt::Display for TemplateId {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", &self.0.to_string()[..8])
    }
}

/// Purpose for which a file was created.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum Purpose {
    /// File contains batch API request templates
    Batch,
    /// Virtual file that streams batch output (completed requests)
    BatchOutput,
    /// Virtual file that streams batch errors (failed requests)
    BatchError,
}

/// Type of batch output file for lookup purposes.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OutputFileType {
    /// Output file containing completed requests
    Output,
    /// Error file containing failed requests
    Error,
}

impl fmt::Display for Purpose {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Purpose::Batch => write!(f, "batch"),
            Purpose::BatchOutput => write!(f, "batch_output"),
            Purpose::BatchError => write!(f, "batch_error"),
        }
    }
}

impl FromStr for Purpose {
    type Err = String;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        match s.to_lowercase().as_str() {
            "batch" => Ok(Purpose::Batch),
            "batch_output" => Ok(Purpose::BatchOutput),
            "batch_error" => Ok(Purpose::BatchError),
            _ => Err(format!("Invalid purpose: {}", s)),
        }
    }
}

/// File status tracking lifecycle and health.
///
/// Status tracks the file's lifecycle:
/// - `Processed`: Successfully uploaded and parsed into templates (users can access)
/// - `Error`: Failed to process during upload (only visible to admins with SystemAccess)
/// - `Deleted`: Soft-deleted by user (metadata retained for audit, only visible to admins)
/// - `Expired`: Past its expiration date (metadata retained for audit, only visible to admins)
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum FileStatus {
    /// File was successfully processed and templates created
    Processed,
    /// File processing failed (see error_message for details)
    Error,
    /// File was soft-deleted by user (metadata retained for audit)
    Deleted,
    /// File has passed its expiration date
    Expired,
}

impl fmt::Display for FileStatus {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            FileStatus::Processed => write!(f, "processed"),
            FileStatus::Error => write!(f, "error"),
            FileStatus::Deleted => write!(f, "deleted"),
            FileStatus::Expired => write!(f, "expired"),
        }
    }
}

impl FromStr for FileStatus {
    type Err = String;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        match s.to_lowercase().as_str() {
            "processed" => Ok(FileStatus::Processed),
            "error" => Ok(FileStatus::Error),
            "deleted" => Ok(FileStatus::Deleted),
            "expired" => Ok(FileStatus::Expired),
            _ => Err(format!("Invalid file status: {}", s)),
        }
    }
}

/// A file containing a collection of request templates.
#[derive(Debug, Clone, Serialize)]
pub struct File {
    pub id: FileId,
    pub name: String,
    pub description: Option<String>,
    pub size_bytes: i64,
    pub status: FileStatus,
    pub error_message: Option<String>,
    pub purpose: Option<Purpose>,
    pub expires_at: Option<DateTime<Utc>>,
    pub deleted_at: Option<DateTime<Utc>>,
    pub uploaded_by: Option<String>,
    pub created_at: DateTime<Utc>,
    pub updated_at: DateTime<Utc>,
    pub size_finalized: bool,
}

/// A request template defining how to make a request.
///
/// Templates are mutable, but requests snapshot the template state
/// at execution time.
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct RequestTemplate {
    pub id: TemplateId,
    pub file_id: FileId,
    pub custom_id: Option<String>, // OpenAI Batch API custom identifier
    pub endpoint: String,
    pub method: String,
    pub path: String,
    pub body: String,
    pub model: String,
    pub api_key: String,
    pub created_at: DateTime<Utc>,
    pub updated_at: DateTime<Utc>,
}

/// Input for creating a new request template.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, serde::Deserialize)]
pub struct RequestTemplateInput {
    pub custom_id: Option<String>, // OpenAI Batch API custom identifier
    pub endpoint: String,
    pub method: String,
    pub path: String,
    pub body: String,
    pub model: String,
    pub api_key: String,
}

/// Batch output item - represents a completed request in OpenAI format.
#[derive(Debug, Clone, Serialize, serde::Deserialize)]
pub struct BatchOutputItem {
    /// Request ID
    pub id: String,
    /// Custom ID from the original request
    pub custom_id: Option<String>,
    /// Response details
    pub response: BatchResponseDetails,
    /// Error (should be null for successful responses)
    pub error: Option<serde_json::Value>,
}

/// Batch error item - represents a failed request in OpenAI format.
#[derive(Debug, Clone, Serialize, serde::Deserialize)]
pub struct BatchErrorItem {
    /// Request ID
    pub id: String,
    /// Custom ID from the original request
    pub custom_id: Option<String>,
    /// Response (should be null for errors)
    pub response: Option<serde_json::Value>,
    /// Error details
    pub error: BatchErrorDetails,
}

/// Response details for a batch output item.
#[derive(Debug, Clone, Serialize, serde::Deserialize)]
pub struct BatchResponseDetails {
    /// HTTP status code
    pub status_code: i16,
    /// Request ID from the upstream API
    pub request_id: Option<String>,
    /// Response body (e.g., ChatCompletion, Embedding, etc.)
    pub body: serde_json::Value,
}

/// Error details for a batch error item.
#[derive(Debug, Clone, Serialize, serde::Deserialize)]
pub struct BatchErrorDetails {
    /// Error code
    pub code: Option<String>,
    /// Error message
    pub message: String,
}

/// Enum for different types of file content that can be streamed.
#[derive(Debug, Clone, Serialize)]
#[serde(untagged)]
pub enum FileContentItem {
    /// Request template (for input files with purpose='batch')
    Template(RequestTemplateInput),
    /// Batch output (for output files with purpose='batch_output')
    Output(BatchOutputItem),
    /// Batch error (for error files with purpose='batch_error')
    Error(BatchErrorItem),
}

/// Status of a batch result item.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum BatchResultStatus {
    /// Request is pending execution
    Pending,
    /// Request is currently being processed (claimed or processing)
    InProgress,
    /// Request completed successfully
    Completed,
    /// Request failed with an error
    Failed,
}

/// Merged batch result item combining input, output, and status.
/// Used for the Results view to show input/output pairs in a single row.
#[derive(Debug, Clone, Serialize)]
pub struct BatchResultItem {
    /// Fusillade request ID (unique identifier)
    pub id: String,
    /// User-provided identifier (NOT unique - may be duplicated)
    pub custom_id: Option<String>,
    /// Model used for this request
    pub model: String,
    /// Original request body from the input template
    pub input_body: serde_json::Value,
    /// Full response object (choices, usage, etc.) for completed requests
    pub response_body: Option<serde_json::Value>,
    /// Error message for failed requests
    pub error: Option<String>,
    /// Current status of the request
    pub status: BatchResultStatus,
}

/// Metadata for creating a file from a stream
#[derive(Debug, Clone, Default, Serialize)]
pub struct FileMetadata {
    pub filename: Option<String>,
    pub description: Option<String>,
    pub purpose: Option<String>,
    pub expires_after_anchor: Option<String>,
    pub expires_after_seconds: Option<i64>,
    pub size_bytes: Option<i64>,
    pub uploaded_by: Option<String>,
}

/// Filter parameters for listing files
#[derive(Debug, Clone, Default)]
pub struct FileFilter {
    /// Filter by user who uploaded the file
    /// TODO: We use a string here, because this crate is decoupled from the dwctl one which uses a
    /// UUID. Is this fine? This just needs to be a unique identifier per user.
    pub uploaded_by: Option<String>,
    /// Filter by file status (processed, error, deleted, expired)
    pub status: Option<String>,
    /// Filter by purpose
    pub purpose: Option<String>,
    /// Search query for filename (case-insensitive substring match)
    pub search: Option<String>,
    /// Cursor for pagination (file ID to start after)
    pub after: Option<FileId>,
    /// Maximum number of results to return
    pub limit: Option<usize>,
    /// Sort order (true = ascending, false = descending)
    pub ascending: bool,
}

/// Items that can be yielded from a file upload stream
#[derive(Debug, Clone, Serialize)]
pub enum FileStreamItem {
    /// File metadata (should be first item in stream)
    Metadata(FileMetadata),
    /// A request template parsed from JSONL
    Template(RequestTemplateInput),
    /// An error occurred during parsing
    Error(String),
}

/// Input parameters for creating a new batch.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchInput {
    /// The file containing request templates
    pub file_id: FileId,
    /// The API endpoint to use for all requests (e.g., "/v1/chat/completions")
    pub endpoint: String,
    /// Completion window (e.g., "24h")
    pub completion_window: String,
    /// Optional metadata key-value pairs (OpenAI allows up to 16 pairs)
    pub metadata: Option<serde_json::Value>,
    /// User who created this batch (for ownership tracking)
    pub created_by: Option<String>,
}

/// A batch represents one execution of all of a file's templates.
#[derive(Debug, Clone, Serialize)]
pub struct Batch {
    pub id: BatchId,
    pub file_id: Option<FileId>,
    pub created_at: DateTime<Utc>,
    /// Metadata key-value pairs (OpenAI allows up to 16 pairs)
    pub metadata: Option<serde_json::Value>,
    /// Completion window (e.g., "24h")
    pub completion_window: String,
    /// The API endpoint to use for all requests (e.g., "/v1/chat/completions")
    pub endpoint: String,
    /// File ID containing the successful results
    pub output_file_id: Option<FileId>,
    /// File ID containing the error results
    pub error_file_id: Option<FileId>,
    /// User who created this batch
    pub created_by: Option<String>,
    /// When the batch will expire (created_at + completion_window)
    /// This is required for queue prioritization and SLA monitoring
    pub expires_at: DateTime<Utc>,
    /// When batch cancellation was initiated
    pub cancelling_at: Option<DateTime<Utc>>,
    /// Batch-level errors (validation errors, system errors, etc.)
    pub errors: Option<serde_json::Value>,

    /// Status fields
    pub total_requests: i64,
    pub pending_requests: i64,
    pub in_progress_requests: i64,
    pub completed_requests: i64,
    pub failed_requests: i64,
    pub canceled_requests: i64,
    pub requests_started_at: Option<DateTime<Utc>>,

    /// Terminal state timestamps (set once when batch enters that state)
    pub finalizing_at: Option<DateTime<Utc>>,
    pub completed_at: Option<DateTime<Utc>>,
    pub failed_at: Option<DateTime<Utc>>,
    pub cancelled_at: Option<DateTime<Utc>>,

    /// When batch was soft-deleted. NULL means active.
    pub deleted_at: Option<DateTime<Utc>>,

    /// When batch completion notification was sent. NULL means not yet notified.
    pub notification_sent_at: Option<DateTime<Utc>>,
}

/// A batch with extra context for notification emails (file metadata, model names).
/// Returned by `poll_completed_batches` which joins the files and requests tables.
#[derive(Debug, Clone)]
pub struct BatchNotification {
    pub batch: Batch,
    pub model: String,
    pub input_file_name: Option<String>,
    pub input_file_description: Option<String>,
}

/// Status information for a batch, computed from its executions.
#[derive(Debug, Clone, Serialize)]
pub struct BatchStatus {
    pub batch_id: BatchId,
    pub file_id: Option<FileId>,
    pub file_name: Option<String>,
    pub total_requests: i64,
    pub pending_requests: i64,
    pub in_progress_requests: i64,
    pub completed_requests: i64,
    pub failed_requests: i64,
    pub canceled_requests: i64,
    pub started_at: Option<DateTime<Utc>>,
    pub created_at: DateTime<Utc>,
}

impl BatchStatus {
    /// Check if the batch has finished (all requests in terminal state).
    pub fn is_finished(&self) -> bool {
        self.completed_requests + self.failed_requests + self.canceled_requests
            == self.total_requests
    }

    /// Check if the batch is still running.
    pub fn is_running(&self) -> bool {
        !self.is_finished()
    }

    /// Get OpenAI-compatible status string.
    /// Maps internal state to OpenAI's status values:
    /// - "validating" - batch just created, no requests started yet
    /// - "in_progress" - batch is being processed
    /// - "finalizing" - nearly all requests done (95%+ complete)
    /// - "completed" - all requests in terminal state and at least one succeeded
    /// - "failed" - all requests in terminal state and all failed
    /// - "cancelled" - all requests cancelled
    pub fn openai_status(&self) -> &'static str {
        if self.total_requests == 0 {
            return "validating";
        }

        let terminal_count =
            self.completed_requests + self.failed_requests + self.canceled_requests;

        if terminal_count == 0 {
            // Nothing has started yet
            "validating"
        } else if terminal_count == self.total_requests {
            // All done - determine outcome
            if self.canceled_requests == self.total_requests {
                "cancelled"
            } else if self.completed_requests == 0 {
                "failed"
            } else {
                "completed"
            }
        } else if terminal_count as f64 / self.total_requests as f64 >= 0.95 {
            // Nearly done (95%+)
            "finalizing"
        } else {
            // In progress
            "in_progress"
        }
    }
}

/// Aggregated statistics for a model's templates in a file.
/// Used for efficient cost estimation without streaming all template data.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ModelTemplateStats {
    /// The model name
    pub model: String,
    /// Number of request templates using this model
    pub request_count: i64,
    /// Total size of all request bodies in bytes
    pub total_body_bytes: i64,
}