1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
use std::path::PathBuf;
use uuid::Uuid;
use super::LibraryId;
use crate::chrono::{DateTime, Utc};
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct ScanRequest {
pub library_id: LibraryId,
pub force_refresh: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct ScanResponse {
pub status: ScanStatus,
pub scan_id: Option<Uuid>,
pub message: String,
}
impl ScanResponse {
pub fn new(
status: ScanStatus,
scan_id: Option<Uuid>,
message: String,
) -> Self {
ScanResponse {
status,
scan_id,
message,
}
}
pub fn new_scan_started(scan_id: Uuid, message: String) -> Self {
ScanResponse {
status: ScanStatus::Scanning,
scan_id: Some(scan_id),
message,
}
}
pub fn new_failed(message: String) -> Self {
ScanResponse {
status: ScanStatus::Failed,
scan_id: None,
message,
}
}
pub fn new_canceled(scan_id: Uuid) -> Self {
ScanResponse {
status: ScanStatus::Cancelled,
scan_id: Some(scan_id),
message: "Scan canceled".to_string(),
}
}
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct ScanProgress {
pub scan_id: Uuid,
pub status: ScanStatus,
pub paths: Vec<PathBuf>,
pub library_names: Vec<String>,
pub library_ids: Vec<String>,
pub folders_to_scan: usize,
pub folders_scanned: usize,
pub movies_scanned: usize,
pub series_scanned: usize,
pub seasons_scanned: usize,
pub episodes_scanned: usize,
pub skipped_samples: usize,
pub errors: Vec<String>,
pub current_media: Option<String>,
pub current_library: Option<String>,
pub started_at: DateTime<Utc>,
pub completed_at: Option<DateTime<Utc>>,
pub estimated_time_remaining: Option<std::time::Duration>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum ScanStatus {
Pending,
Scanning,
Completed,
Failed,
Cancelled,
}
pub mod scanner {
pub mod settings {
/// Default file extensions treated as video assets by the scanner.
pub const DEFAULT_VIDEO_FILE_EXTENSIONS: &[&str] = &[
"mp4", "mkv", "avi", "mov", "webm", "flv", "wmv", "m4v", "mpg",
"mpeg",
];
/// Convenience helper for consumers that work with owned strings.
pub fn default_video_file_extensions_vec() -> Vec<String> {
DEFAULT_VIDEO_FILE_EXTENSIONS
.iter()
.map(|ext| ext.to_string())
.collect()
}
}
}
pub mod orchestration {
pub mod budget {
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
/// Configuration for workload limits.
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct BudgetConfig {
/// Default 1 - one library scan at a time.
pub library_scan_limit: usize,
/// Default low to avoid disk overload.
pub media_analysis_limit: usize,
/// Default 2 * CPU count.
pub metadata_limit: usize,
/// Default moderate.
pub indexing_limit: usize,
/// Poster/backdrop workers.
pub image_fetch_limit: usize,
}
impl Default for BudgetConfig {
fn default() -> Self {
let cpu_count =
std::thread::available_parallelism().map_or(1, |n| n.get());
Self {
library_scan_limit: 1,
media_analysis_limit: 4,
metadata_limit: cpu_count * 2,
indexing_limit: cpu_count,
image_fetch_limit: 4,
}
}
}
}
pub mod config {
use std::collections::HashMap;
use crate::ids::LibraryId;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
/// Global knobs that tune orchestrator behaviour.
///
/// All fields carry defaults so existing deployments can progressively adopt
/// new scheduling features without supplying a full configuration payload.
#[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct OrchestratorConfig {
/// Queue sizing, fairness weights, and per-library overrides.
pub queue: QueueConfig,
/// Priority weights used by the scheduler when rotating buckets.
pub priority_weights: PriorityWeights,
/// Retry/backoff policy shared by all workers.
pub retry: RetryConfig,
/// Limits for metadata enrichment workers.
pub metadata_limits: MetadataLimits,
/// Bulk maintenance tuning settings.
pub bulk_mode: BulkModeTuning,
/// Lease defaults (TTL, renewal thresholds, housekeeping cadence).
pub lease: LeaseConfig,
/// Global concurrency budget configuration for actor workloads.
pub budget: super::budget::BudgetConfig,
/// Filesystem watch debounce and batching configuration.
pub watch: WatchConfig,
}
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct QueueConfig {
/// Maximum worker concurrency per queue. These values drive worker pool sizes.
pub max_parallel_scans: usize,
pub max_parallel_series_resolve: usize,
pub max_parallel_analyses: usize,
pub max_parallel_metadata: usize,
pub max_parallel_index: usize,
pub max_parallel_image_fetch: usize,
/// Per-device cap for scan workers touching the same mount.
pub max_parallel_scans_per_device: usize,
/// High watermark for queued jobs. Beyond this we start coalescing low priority work.
pub high_watermark: usize,
/// Critical watermark for queued jobs. Beyond this P2/P3 work is merged instead of enqueued.
pub critical_watermark: usize,
/// Sliding window (milliseconds) for aggregating duplicate work items.
pub coalesce_window_ms: u64,
/// Default maximum in-flight leases allowed per library.
pub default_library_cap: usize,
/// Default scheduling weight assigned to libraries without overrides.
pub default_library_weight: u32,
/// Optional per-library overrides.
#[cfg_attr(feature = "serde", serde(default))]
pub library_overrides: HashMap<LibraryId, LibraryQueuePolicy>,
}
impl Default for QueueConfig {
fn default() -> Self {
Self {
max_parallel_scans: 6,
max_parallel_series_resolve: 2,
max_parallel_analyses: 2,
max_parallel_metadata: 4,
max_parallel_index: 1,
max_parallel_image_fetch: 4,
max_parallel_scans_per_device: 16,
high_watermark: 10_000,
critical_watermark: 20_000,
coalesce_window_ms: 100,
default_library_cap: 32,
default_library_weight: 1,
library_overrides: HashMap::new(),
}
}
}
/// Library-specific overrides for queue fairness.
#[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct LibraryQueuePolicy {
/// Optional in-flight cap; falls back to `default_library_cap` when missing.
pub max_inflight: Option<usize>,
/// Optional scheduling weight multiplier; falls back to `default_library_weight`.
pub weight: Option<u32>,
}
/// Lease/heartbeat tuning for worker tasks.
#[derive(Clone, Copy, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct LeaseConfig {
/// Default TTL for job leases (seconds).
pub lease_ttl_secs: i64,
/// Renew when remaining TTL drops below this fraction of the original TTL (e.g. 0.5).
pub renew_at_fraction: f32,
/// Minimum margin before expiry to trigger a renewal regardless of fraction (ms).
pub renew_min_margin_ms: u64,
/// Housekeeping cadence for scanning expired leases (ms).
pub housekeeper_interval_ms: u64,
}
impl Default for LeaseConfig {
fn default() -> Self {
Self {
lease_ttl_secs: 30,
renew_at_fraction: 0.5,
renew_min_margin_ms: 2_000,
housekeeper_interval_ms: 15_000,
}
}
}
#[derive(Clone, Copy, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct PriorityWeights {
pub p0: u8,
pub p1: u8,
pub p2: u8,
pub p3: u8,
}
impl Default for PriorityWeights {
fn default() -> Self {
Self {
p0: 8,
p1: 4,
p2: 2,
p3: 1,
}
}
}
#[derive(Clone, Copy, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct RetryConfig {
pub max_attempts: u16,
pub backoff_base_ms: u64,
pub backoff_max_ms: u64,
/// Attempts that should receive the "fast retry" treatment for user-visible scans.
pub fast_retry_attempts: u16,
/// Multiplier applied to base delay while in the fast retry window.
pub fast_retry_factor: f32,
/// When a library accumulates this many retry-heavy jobs we slow the whole queue.
pub heavy_library_attempt_threshold: u16,
/// Slowdown multiplier applied when a library is considered under stress.
pub heavy_library_slowdown_factor: f32,
/// Percentage-based jitter to spread out retries.
pub jitter_ratio: f32,
/// Minimum jitter in milliseconds so tiny jobs still randomise a bit.
pub jitter_min_ms: u64,
}
impl RetryConfig {
pub fn backoff_base(&self) -> core::time::Duration {
core::time::Duration::from_millis(self.backoff_base_ms)
}
pub fn backoff_max(&self) -> core::time::Duration {
core::time::Duration::from_millis(self.backoff_max_ms)
}
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_attempts: 5,
backoff_base_ms: 2_000,
backoff_max_ms: 5 * 60 * 1_000,
fast_retry_attempts: 2,
fast_retry_factor: 0.35,
heavy_library_attempt_threshold: 4,
heavy_library_slowdown_factor: 1.8,
jitter_ratio: 0.25,
jitter_min_ms: 250,
}
}
}
#[derive(Clone, Copy, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct MetadataLimits {
pub max_concurrency: usize,
pub max_qps: u32,
}
impl Default for MetadataLimits {
fn default() -> Self {
Self {
max_concurrency: 2,
max_qps: 100,
}
}
}
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct BulkModeTuning {
pub speedup_factor: f32,
pub maintenance_partition_count: usize,
}
impl Default for BulkModeTuning {
fn default() -> Self {
Self {
speedup_factor: 1.2,
maintenance_partition_count: 8,
}
}
}
/// Tuning controls for filesystem watch coalescing.
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct WatchConfig {
/// Debounce window in milliseconds.
pub debounce_window_ms: u64,
/// Maximum number of events to flush in a single batch.
pub max_batch_events: usize,
/// Polling cadence in milliseconds for filesystems without native watchers.
#[cfg_attr(
feature = "serde",
serde(default = "WatchConfig::default_poll_interval_ms")
)]
pub poll_interval_ms: u64,
}
impl Default for WatchConfig {
fn default() -> Self {
Self {
debounce_window_ms: 250,
max_batch_events: 8192,
poll_interval_ms: Self::default_poll_interval_ms(),
}
}
}
impl WatchConfig {
const fn default_poll_interval_ms() -> u64 {
30_000
}
}
}
}