Skip to main content

smoo_gadget_app/
lib.rs

1use anyhow::{anyhow, ensure, Context, Result};
2use clap::{Parser, ValueEnum};
3use hyper::service::{make_service_fn, service_fn};
4use hyper::{Body, Request as HttpRequest, Response as HttpResponse, Server, StatusCode};
5use metrics_exporter_prometheus::PrometheusBuilder;
6use smoo_gadget_core::{
7    ConfigExport, ConfigExportsV0, ControlIo, DeviceHandle, DmaHeap, ExportController, ExportFlags,
8    ExportReconcileContext, ExportSpec, ExportState, FunctionfsEndpoints, GadgetConfig,
9    GadgetControl, GadgetStatusReport, IoPumpHandle, IoWork, LinkCommand, LinkController,
10    LinkState, PersistedExportRecord, RuntimeTunables, SetupCommand, SetupPacket, SmooGadget,
11    SmooUblk, SmooUblkDevice, StateStore, UblkIoRequest, UblkOp, UblkQueueRuntime,
12};
13use smoo_proto::{Ident, OpCode, Request, SMOO_STATUS_REQUEST, SMOO_STATUS_REQ_TYPE};
14use std::{
15    collections::{HashMap, HashSet, VecDeque},
16    convert::Infallible,
17    fs::File,
18    io,
19    net::SocketAddr,
20    os::fd::{FromRawFd, IntoRawFd, OwnedFd},
21    path::{Path, PathBuf},
22    sync::{
23        atomic::{AtomicU64, Ordering},
24        Arc,
25    },
26    time::{Duration, Instant},
27};
28use tokio::{
29    signal,
30    signal::unix::{signal as unix_signal, SignalKind},
31    sync::{
32        mpsc,
33        mpsc::error::{TryRecvError, TrySendError},
34        watch, Mutex, Notify, RwLock,
35    },
36    task::JoinHandle,
37};
38use tokio_util::sync::CancellationToken;
39use tracing::{debug, error, info, trace, warn};
40use usb_gadget::{
41    function::custom::{
42        CtrlReceiver, CtrlReq, CtrlSender, Custom, CustomBuilder, Endpoint, EndpointDirection,
43        Event, Interface, TransferType,
44    },
45    Class, Config, Gadget, Id, RegGadget, Strings,
46};
47
48const SMOO_CLASS: u8 = 0xFF;
49const SMOO_SUBCLASS: u8 = 0x53;
50const SMOO_PROTOCOL: u8 = 0x4D;
51const FASTBOOT_SUBCLASS: u8 = 0x42;
52const FASTBOOT_PROTOCOL: u8 = 0x03;
53const DEFAULT_MAX_IO_BYTES: usize = 4 * 1024 * 1024;
54const CONFIG_CHANNEL_DEPTH: usize = 32;
55const QUEUE_CHANNEL_DEPTH: usize = 128;
56const QUEUE_BATCH_MAX: usize = 32;
57const OUTSTANDING_BATCH_MAX: usize = 32;
58const IDLE_INTERVAL_MS: u64 = 10;
59const LIVENESS_INTERVAL_MS: u64 = 500;
60const MAINTENANCE_SLICE_MS: u64 = 200;
61const RECONCILE_TIMEOUT_MS: u64 = 200;
62const GRACEFUL_SHUTDOWN_TIMEOUT_MS: u64 = 5_000;
63
64#[derive(Debug, Parser)]
65#[command(name = "smoo-gadget", version)]
66#[command(about = "Expose a smoo gadget backed by FunctionFS + ublk", long_about = None)]
67pub struct Args {
68    /// USB vendor ID for the gadget (hex).
69    #[arg(long, value_name = "HEX", default_value = "0xDEAD", value_parser = parse_hex_u16)]
70    pub vendor_id: u16,
71    /// USB product ID for the gadget (hex).
72    #[arg(long, value_name = "HEX", default_value = "0xBEEF", value_parser = parse_hex_u16)]
73    pub product_id: u16,
74    /// Number of ublk queues to configure.
75    #[arg(long, default_value_t = 1)]
76    pub queue_count: u16,
77    /// Depth of each ublk queue.
78    #[arg(long, default_value_t = 16)]
79    pub queue_depth: u16,
80    /// Maximum per-I/O size in bytes to advertise to ublk (block-aligned).
81    #[arg(long = "max-io", value_name = "BYTES")]
82    pub max_io_bytes: Option<usize>,
83    /// Opt-in to the experimental DMA-BUF fast path when supported by the kernel.
84    #[arg(long)]
85    pub experimental_dma_buf: bool,
86    /// DMA-HEAP to allocate from when DMA-BUF mode is enabled.
87    #[arg(long, value_enum, default_value_t = DmaHeapSelection::System)]
88    pub dma_heap: DmaHeapSelection,
89    /// Path to the recovery state file. When unset, crash recovery is disabled.
90    #[arg(long, value_name = "PATH")]
91    pub state_file: Option<PathBuf>,
92    /// Adopt existing ublk devices via user recovery.
93    #[arg(long)]
94    pub adopt: bool,
95    /// Expose Prometheus metrics on this TCP port (0 disables).
96    #[arg(long, default_value_t = 0)]
97    pub metrics_port: u16,
98    /// Use an existing FunctionFS directory and skip configfs management.
99    #[arg(long, value_name = "PATH")]
100    pub ffs_dir: Option<PathBuf>,
101    /// Use fastboot-style interface subclass/protocol for restrictive WebUSB flows.
102    #[arg(long)]
103    pub mimic_fastboot: bool,
104}
105
106#[derive(Clone, Copy, Debug, ValueEnum)]
107pub enum DmaHeapSelection {
108    System,
109    Cma,
110    Reserved,
111}
112
113impl From<DmaHeapSelection> for DmaHeap {
114    fn from(value: DmaHeapSelection) -> Self {
115        match value {
116            DmaHeapSelection::System => DmaHeap::System,
117            DmaHeapSelection::Cma => DmaHeap::Cma,
118            DmaHeapSelection::Reserved => DmaHeap::Reserved,
119        }
120    }
121}
122
123impl Default for Args {
124    fn default() -> Self {
125        Self {
126            vendor_id: 0xDEAD,
127            product_id: 0xBEEF,
128            queue_count: 1,
129            queue_depth: 16,
130            max_io_bytes: None,
131            experimental_dma_buf: false,
132            dma_heap: DmaHeapSelection::System,
133            state_file: None,
134            adopt: false,
135            metrics_port: 0,
136            ffs_dir: None,
137            mimic_fastboot: false,
138        }
139    }
140}
141
142pub async fn run_from_env() -> Result<()> {
143    let args = Args::parse();
144    let result = run_impl(args).await;
145    if let Err(err) = &result {
146        error!(error = ?err, "smoo-gadget exiting with error");
147    }
148    result
149}
150
151pub async fn run_with_args(args: Args) -> Result<()> {
152    run_impl(args).await
153}
154
155async fn run_impl(args: Args) -> Result<()> {
156    let metrics_shutdown = CancellationToken::new();
157    let metrics_task = spawn_metrics_listener(args.metrics_port, metrics_shutdown.clone())?;
158    let mut ublk = SmooUblk::new().context("init ublk")?;
159    let mut state_store = if let Some(path) = args.state_file.as_ref() {
160        info!(path = ?path, "state file configured");
161        match StateStore::load(path.clone()) {
162            Ok(store) => store,
163            Err(err) => {
164                warn!(path = ?path, error = ?err, "failed to load state file; starting new session");
165                StateStore::new_with_path(path.clone())
166            }
167        }
168    } else {
169        debug!("state file disabled; crash recovery off");
170        StateStore::new()
171    };
172
173    initialize_session(&mut ublk, &mut state_store).await?;
174    if args.adopt {
175        adopt_prepare(&mut ublk, &mut state_store).await?;
176    }
177
178    let (custom, endpoints, _gadget_guard, _ffs_dir) =
179        setup_configfs(&args).context("setup ConfigFS")?;
180
181    let ident = Ident::new(0, 1);
182    let dma_heap = args.experimental_dma_buf.then(|| args.dma_heap.into());
183    let max_io_bytes = args.max_io_bytes.unwrap_or(DEFAULT_MAX_IO_BYTES);
184    let gadget_config = GadgetConfig::new(
185        ident,
186        args.queue_count,
187        args.queue_depth,
188        max_io_bytes,
189        dma_heap,
190    );
191    let gadget =
192        Arc::new(SmooGadget::new(endpoints, gadget_config).context("init smoo gadget core")?);
193    info!(
194        ident_major = ident.major,
195        ident_minor = ident.minor,
196        queues = args.queue_count,
197        depth = args.queue_depth,
198        max_io_bytes = max_io_bytes,
199        "smoo gadget initialized"
200    );
201
202    let control_handler = gadget.control_handler();
203    let (control_tx, control_rx) = mpsc::channel(CONFIG_CHANNEL_DEPTH);
204    let (control_stop_tx, control_stop_rx) = watch::channel(false);
205
206    let exports = build_initial_exports(&state_store);
207    let initial_export_count = count_active_exports(&exports);
208    let status = GadgetStatusShared::new(GadgetStatus::new(
209        state_store.session_id(),
210        initial_export_count,
211    ));
212    let ep0_signals = Ep0Signals::new();
213    let control_task = tokio::spawn(control_loop(
214        custom,
215        control_handler,
216        status.clone(),
217        ep0_signals.clone(),
218        control_stop_rx,
219        control_tx,
220    ));
221    let tunables = RuntimeTunables {
222        queue_count: args.queue_count,
223        queue_depth: args.queue_depth,
224        max_io_bytes: args.max_io_bytes,
225        dma_heap,
226    };
227    let link = LinkController::new(Duration::from_secs(3));
228    let io_pump_capacity = args.queue_count as usize * args.queue_depth as usize;
229    let runtime = RuntimeState {
230        state_store,
231        status,
232        exports,
233        queue_tasks: HashMap::new(),
234        tunables,
235        gadget: Some(gadget),
236        io_pump: None,
237        io_pump_task: None,
238        io_pump_capacity,
239        reconcile_queue: VecDeque::new(),
240        data_plane_epoch: 0,
241    };
242    let result = run_event_loop(
243        &mut ublk,
244        runtime,
245        control_rx,
246        link,
247        ep0_signals,
248        control_stop_tx.clone(),
249    )
250    .await;
251    metrics_shutdown.cancel();
252    if let Some(task) = metrics_task {
253        let _ = task.await;
254    }
255    let _ = control_stop_tx.send(true);
256    control_task.abort();
257    let _ = control_task.await;
258    result
259}
260
261fn spawn_metrics_listener(
262    port: u16,
263    shutdown: CancellationToken,
264) -> Result<Option<JoinHandle<()>>> {
265    if port == 0 {
266        return Ok(None);
267    }
268    let handle = PrometheusBuilder::new()
269        .install_recorder()
270        .context("install Prometheus metrics recorder")?;
271    let addr = SocketAddr::from(([0, 0, 0, 0], port));
272    let task = tokio::spawn(async move {
273        let make_svc = make_service_fn(move |_conn| {
274            let handle = handle.clone();
275            async move {
276                Ok::<_, Infallible>(service_fn(move |req: HttpRequest<Body>| {
277                    let handle = handle.clone();
278                    async move {
279                        if req.uri().path() != "/metrics" {
280                            return Ok::<_, Infallible>(
281                                HttpResponse::builder()
282                                    .status(StatusCode::NOT_FOUND)
283                                    .body(Body::from("not found"))
284                                    .unwrap(),
285                            );
286                        }
287                        let body = handle.render();
288                        Ok::<_, Infallible>(
289                            HttpResponse::builder()
290                                .status(StatusCode::OK)
291                                .header(hyper::header::CONTENT_TYPE, "text/plain; version=0.0.4")
292                                .body(Body::from(body))
293                                .unwrap(),
294                        )
295                    }
296                }))
297            }
298        });
299
300        let server = Server::bind(&addr).serve(make_svc);
301        let graceful = server.with_graceful_shutdown(async {
302            shutdown.cancelled().await;
303        });
304
305        if let Err(err) = graceful.await {
306            warn!(error = %err, %addr, "metrics server error");
307        }
308    });
309
310    info!(%addr, "metrics listener started");
311    Ok(Some(task))
312}
313
314#[derive(Clone, Copy, Debug)]
315struct GadgetStatus {
316    session_id: u64,
317    export_count: u32,
318}
319
320impl GadgetStatus {
321    fn new(session_id: u64, export_count: u32) -> Self {
322        Self {
323            session_id,
324            export_count,
325        }
326    }
327}
328
329#[derive(Clone)]
330struct GadgetStatusShared {
331    inner: Arc<RwLock<GadgetStatus>>,
332}
333
334impl GadgetStatusShared {
335    fn new(initial: GadgetStatus) -> Self {
336        Self {
337            inner: Arc::new(RwLock::new(initial)),
338        }
339    }
340
341    async fn snapshot(&self) -> GadgetStatus {
342        *self.inner.read().await
343    }
344
345    async fn report(&self) -> GadgetStatusReport {
346        let snapshot = self.snapshot().await;
347        GadgetStatusReport::new(snapshot.session_id, snapshot.export_count)
348    }
349
350    async fn set_export_count(&self, export_count: u32) {
351        let mut guard = self.inner.write().await;
352        guard.export_count = export_count;
353    }
354}
355
356#[derive(Clone)]
357struct Ep0Signals {
358    status_seq: Arc<AtomicU64>,
359    lifecycle_seq: Arc<AtomicU64>,
360    lifecycle: Arc<Mutex<Vec<Event<'static>>>>,
361    notify: Arc<Notify>,
362}
363
364impl Ep0Signals {
365    fn new() -> Self {
366        Self {
367            status_seq: Arc::new(AtomicU64::new(0)),
368            lifecycle_seq: Arc::new(AtomicU64::new(0)),
369            lifecycle: Arc::new(Mutex::new(Vec::new())),
370            notify: Arc::new(Notify::new()),
371        }
372    }
373
374    fn status_seq(&self) -> u64 {
375        self.status_seq.load(Ordering::Relaxed)
376    }
377
378    fn lifecycle_seq(&self) -> u64 {
379        self.lifecycle_seq.load(Ordering::Relaxed)
380    }
381
382    fn mark_status_ping(&self) {
383        self.status_seq.fetch_add(1, Ordering::Relaxed);
384        self.notify.notify_waiters();
385    }
386
387    async fn push_lifecycle(&self, event: Event<'static>) {
388        let mut guard = self.lifecycle.lock().await;
389        guard.push(event);
390        self.lifecycle_seq.fetch_add(1, Ordering::Relaxed);
391        self.notify.notify_waiters();
392    }
393
394    async fn take_lifecycle(&self) -> Vec<Event<'static>> {
395        let mut guard = self.lifecycle.lock().await;
396        guard.drain(..).collect()
397    }
398
399    fn notifier(&self) -> Arc<Notify> {
400        self.notify.clone()
401    }
402}
403
404struct RuntimeState {
405    state_store: StateStore,
406    status: GadgetStatusShared,
407    exports: HashMap<u32, ExportController>,
408    queue_tasks: HashMap<u32, QueueTaskSet>,
409    tunables: RuntimeTunables,
410    gadget: Option<Arc<SmooGadget>>,
411    io_pump: Option<IoPumpHandle>,
412    io_pump_task: Option<JoinHandle<()>>,
413    io_pump_capacity: usize,
414    reconcile_queue: VecDeque<u32>,
415    data_plane_epoch: u64,
416}
417
418impl RuntimeState {
419    fn status(&self) -> &GadgetStatusShared {
420        &self.status
421    }
422
423    fn state_store(&mut self) -> &mut StateStore {
424        &mut self.state_store
425    }
426}
427
428type QueueSender = mpsc::Sender<QueueEvent>;
429
430struct QueueTaskSet {
431    stop: watch::Sender<bool>,
432    handles: Vec<JoinHandle<()>>,
433}
434
435impl QueueTaskSet {
436    async fn shutdown(self) {
437        let _ = self.stop.send(true);
438        for handle in self.handles {
439            let _ = handle.await;
440        }
441    }
442
443    fn abort(self) {
444        let _ = self.stop.send(true);
445        for handle in self.handles {
446            handle.abort();
447        }
448    }
449}
450
451enum QueueEvent {
452    Request {
453        export_id: u32,
454        dev_id: u32,
455        request: UblkIoRequest,
456        queues: Arc<UblkQueueRuntime>,
457    },
458    QueueError {
459        export_id: u32,
460        dev_id: u32,
461        error: anyhow::Error,
462    },
463}
464
465#[derive(Debug)]
466enum DataPlaneEvent {
467    IoError { epoch: u64, error: io::Error },
468}
469
470fn notify_data_plane_error(tx: &mpsc::UnboundedSender<DataPlaneEvent>, epoch: u64, err: io::Error) {
471    let _ = tx.send(DataPlaneEvent::IoError { epoch, error: err });
472}
473
474struct OutstandingRequest {
475    dev_id: u32,
476    request: UblkIoRequest,
477    queues: Arc<UblkQueueRuntime>,
478}
479
480fn build_initial_exports(state_store: &StateStore) -> HashMap<u32, ExportController> {
481    let mut exports = HashMap::new();
482    for record in state_store.records() {
483        if exports.contains_key(&record.export_id) {
484            warn!(
485                export_id = record.export_id,
486                "duplicate export_id in state store; skipping"
487            );
488            continue;
489        }
490        let state = match record.assigned_dev_id {
491            Some(dev_id) => ExportState::RecoveringPending { dev_id },
492            None => ExportState::New,
493        };
494        exports.insert(
495            record.export_id,
496            ExportController::new(record.export_id, record.spec.clone(), state),
497        );
498    }
499    exports
500}
501
502fn spawn_queue_tasks(
503    export_id: u32,
504    dev_id: u32,
505    queues: Arc<UblkQueueRuntime>,
506    tx: QueueSender,
507) -> QueueTaskSet {
508    let (stop, stop_rx) = watch::channel(false);
509    let mut handles = Vec::new();
510    for queue_id in 0..queues.queue_count() {
511        let mut stop_rx = stop_rx.clone();
512        let queues = queues.clone();
513        let tx = tx.clone();
514        handles.push(tokio::spawn(async move {
515            queue_task_loop(export_id, dev_id, queue_id, queues, &mut stop_rx, tx).await;
516        }));
517    }
518    QueueTaskSet { stop, handles }
519}
520
521async fn queue_task_loop(
522    export_id: u32,
523    dev_id: u32,
524    queue_id: u16,
525    queues: Arc<UblkQueueRuntime>,
526    stop: &mut watch::Receiver<bool>,
527    tx: QueueSender,
528) {
529    loop {
530        tokio::select! {
531            _changed = stop.changed() => {
532                break;
533            }
534            req = queues.next_io(queue_id) => {
535                match req {
536                    Ok(request) => {
537                        let send_fut = tx.send(QueueEvent::Request { export_id, dev_id, request, queues: queues.clone() });
538                        tokio::select! {
539                            res = send_fut => {
540                                if res.is_err() {
541                                    break;
542                                }
543                            }
544                            _ = stop.changed() => break,
545                        }
546                    }
547                    Err(err) => {
548                        if !*stop.borrow() {
549                            let send_fut = tx.send(QueueEvent::QueueError { export_id, dev_id, error: err });
550                            let _ = tokio::select! {
551                                res = send_fut => res,
552                                _ = stop.changed() => Ok(()),
553                            };
554                        }
555                        break;
556                    }
557                }
558            }
559        }
560    }
561}
562
563async fn sync_queue_tasks(runtime: &mut RuntimeState, queue_tx: &QueueSender) {
564    if runtime.io_pump.is_none() {
565        stop_all_queue_tasks(runtime).await;
566        return;
567    }
568    let mut to_stop: Vec<u32> = runtime
569        .queue_tasks
570        .keys()
571        .cloned()
572        .filter(|export_id| !runtime.exports.contains_key(export_id))
573        .collect();
574
575    for (&export_id, controller) in runtime.exports.iter() {
576        let should_run = controller
577            .device_handle()
578            .map(|h| {
579                matches!(
580                    h,
581                    DeviceHandle::Online { .. } | DeviceHandle::Starting { .. }
582                )
583            })
584            .unwrap_or(false);
585        let running = runtime.queue_tasks.contains_key(&export_id);
586        if should_run && runtime.io_pump.is_some() && !running {
587            if let Some(handle) = controller.device_handle() {
588                if let Some(queues) = handle.queues() {
589                    let tasks =
590                        spawn_queue_tasks(export_id, handle.dev_id(), queues, queue_tx.clone());
591                    runtime.queue_tasks.insert(export_id, tasks);
592                }
593            }
594        } else if !should_run && running {
595            to_stop.push(export_id);
596        }
597    }
598
599    for export_id in to_stop {
600        if let Some(tasks) = runtime.queue_tasks.remove(&export_id) {
601            tasks.shutdown().await;
602        }
603    }
604}
605
606async fn stop_all_queue_tasks(runtime: &mut RuntimeState) {
607    let mut tasks = std::mem::take(&mut runtime.queue_tasks);
608    for (_, taskset) in tasks.drain() {
609        taskset.shutdown().await;
610    }
611}
612
613async fn ensure_data_plane(runtime: &mut RuntimeState) {
614    if runtime.gadget.is_none() {
615        if let Some(pump) = runtime.io_pump.take() {
616            drop(pump);
617        }
618        if let Some(task) = runtime.io_pump_task.take() {
619            task.abort();
620            let _ = task.await;
621        }
622        return;
623    }
624
625    if runtime.io_pump.is_none() {
626        if let Some(gadget) = runtime.gadget.clone() {
627            let (handle, task) = IoPumpHandle::spawn(gadget, runtime.io_pump_capacity);
628            runtime.io_pump = Some(handle);
629            runtime.io_pump_task = Some(task);
630        }
631    }
632}
633
634async fn drain_ep0_signals(
635    ep0_signals: &Ep0Signals,
636    last_status_seq: &mut u64,
637    last_lifecycle_seq: &mut u64,
638    link: &mut LinkController,
639) {
640    let status_seq = ep0_signals.status_seq();
641    if status_seq != *last_status_seq {
642        *last_status_seq = status_seq;
643        link.on_status_ping();
644    }
645    if ep0_signals.lifecycle_seq() != *last_lifecycle_seq {
646        let events = ep0_signals.take_lifecycle().await;
647        *last_lifecycle_seq = ep0_signals.lifecycle_seq();
648        for event in events {
649            link.on_ep0_event(event);
650        }
651    }
652}
653
654async fn drain_queue_batch(
655    runtime: &mut RuntimeState,
656    link: &mut LinkController,
657    outstanding: &mut HashMap<u32, HashMap<(u16, u16), OutstandingRequest>>,
658    data_plane_tx: &mpsc::UnboundedSender<DataPlaneEvent>,
659    queue_rx: &mut mpsc::Receiver<QueueEvent>,
660) -> Result<()> {
661    let mut processed = 0;
662    while processed < QUEUE_BATCH_MAX.saturating_sub(1) {
663        match queue_rx.try_recv() {
664            Ok(evt) => {
665                handle_queue_event(runtime, link, outstanding, data_plane_tx, evt).await?;
666                processed += 1;
667            }
668            Err(TryRecvError::Empty) | Err(TryRecvError::Disconnected) => break,
669        }
670    }
671    if processed >= QUEUE_BATCH_MAX.saturating_sub(1) {
672        trace!(processed, "queue batch truncated; will continue next tick");
673    }
674    Ok(())
675}
676
677fn pop_next_outstanding(
678    outstanding: &mut HashMap<u32, HashMap<(u16, u16), OutstandingRequest>>,
679) -> Option<(u32, u16, u16, OutstandingRequest)> {
680    let (export_id, (queue_id, tag)) = outstanding.iter().find_map(|(export_id, reqs)| {
681        reqs.keys()
682            .next()
683            .map(|(queue, tag)| (*export_id, (*queue, *tag)))
684    })?;
685    let pending = outstanding
686        .get_mut(&export_id)
687        .and_then(|map| map.remove(&(queue_id, tag)))?;
688    if let Some(map) = outstanding.get(&export_id) {
689        if map.is_empty() {
690            outstanding.remove(&export_id);
691        }
692    }
693    Some((export_id, queue_id, tag, pending))
694}
695
696async fn drain_outstanding_bounded(
697    runtime: &mut RuntimeState,
698    link: &mut LinkController,
699    outstanding: &mut HashMap<u32, HashMap<(u16, u16), OutstandingRequest>>,
700    deadline: Instant,
701) -> Result<()> {
702    if outstanding.is_empty() {
703        return Ok(());
704    }
705    if link.state() != LinkState::Online {
706        trace!(
707            outstanding_exports = outstanding.len(),
708            "link not online; deferring outstanding IO drain"
709        );
710        return Ok(());
711    }
712    let Some(pump) = runtime.io_pump.as_ref() else {
713        trace!(
714            outstanding_exports = outstanding.len(),
715            "no gadget endpoints available; deferring outstanding IO drain"
716        );
717        return Ok(());
718    };
719
720    let mut processed = 0usize;
721    while processed < OUTSTANDING_BATCH_MAX && Instant::now() < deadline {
722        let Some((export_id, _queue_id, _tag, pending)) = pop_next_outstanding(outstanding) else {
723            break;
724        };
725        let Some(ctrl) = runtime.exports.get(&export_id) else {
726            let _ = pending.queues.complete_io(pending.request, -libc::ENODEV);
727            continue;
728        };
729        let Some(handle) = ctrl.device_handle() else {
730            park_request(
731                outstanding,
732                export_id,
733                pending.dev_id,
734                pending.queues.clone(),
735                pending.request,
736            );
737            break;
738        };
739        if handle.dev_id() != pending.dev_id {
740            trace!(
741                export_id,
742                stale_dev = pending.dev_id,
743                current_dev = handle.dev_id(),
744                "dropping outstanding for stale device"
745            );
746            continue;
747        }
748        if !handle.is_online() {
749            park_request(
750                outstanding,
751                export_id,
752                pending.dev_id,
753                pending.queues.clone(),
754                pending.request,
755            );
756            break;
757        }
758        let Some(queues) = handle.queues() else {
759            park_request(
760                outstanding,
761                export_id,
762                pending.dev_id,
763                pending.queues.clone(),
764                pending.request,
765            );
766            break;
767        };
768        let req = pending.request;
769        trace!(
770            export_id,
771            dev_id = pending.dev_id,
772            queue = req.queue_id,
773            tag = req.tag,
774            "replaying outstanding IO to host"
775        );
776        if let Err(err) = handle_request(pump.clone(), export_id, queues.clone(), req).await {
777            let io_err = io_error_from_anyhow(&err);
778            link.on_io_error(&io_err);
779            park_request(
780                outstanding,
781                export_id,
782                pending.dev_id,
783                pending.queues.clone(),
784                req,
785            );
786            warn!(
787                export_id,
788                queue = req.queue_id,
789                tag = req.tag,
790                error = ?err,
791                "link error replaying outstanding IO; parked again"
792            );
793            break;
794        }
795        processed += 1;
796    }
797
798    if !outstanding.is_empty() {
799        trace!(
800            remaining_exports = outstanding.len(),
801            processed,
802            "outstanding drain truncated"
803        );
804    }
805    Ok(())
806}
807
808async fn run_reconcile_slice(
809    ublk: &mut SmooUblk,
810    runtime: &mut RuntimeState,
811    deadline: Instant,
812) -> Result<()> {
813    let now = Instant::now();
814    for (&export_id, ctrl) in runtime.exports.iter() {
815        if ctrl.needs_reconcile(now) && !runtime.reconcile_queue.contains(&export_id) {
816            runtime.reconcile_queue.push_back(export_id);
817        }
818    }
819
820    while Instant::now() < deadline {
821        let Some(export_id) = runtime.reconcile_queue.pop_front() else {
822            break;
823        };
824        let now = Instant::now();
825        let needs_reconcile = runtime
826            .exports
827            .get(&export_id)
828            .is_some_and(|ctrl| ctrl.needs_reconcile(now));
829        if !needs_reconcile {
830            continue;
831        }
832
833        let tunables = runtime.tunables;
834        let mut controller = match runtime.exports.remove(&export_id) {
835            Some(ctrl) => ctrl,
836            None => continue,
837        };
838        {
839            let mut cx = ExportReconcileContext {
840                ublk,
841                state_store: runtime.state_store(),
842                tunables,
843            };
844            match tokio::time::timeout(
845                Duration::from_millis(RECONCILE_TIMEOUT_MS),
846                controller.reconcile(&mut cx),
847            )
848            .await
849            {
850                Ok(Ok(())) => {}
851                Ok(Err(err)) => {
852                    warn!(export_id, error = ?err, "reconcile failed; backing off");
853                    controller.fail_device(format!("reconcile failed: {err:#}"));
854                }
855                Err(_) => {
856                    warn!(export_id, "reconcile timed out; backing off");
857                    controller.fail_device("reconcile timed out".to_string());
858                }
859            }
860        }
861        let needs_more = controller.needs_reconcile(Instant::now());
862        runtime.exports.insert(export_id, controller);
863        if needs_more {
864            runtime.reconcile_queue.push_back(export_id);
865        }
866
867        if Instant::now() >= deadline {
868            break;
869        }
870    }
871    Ok(())
872}
873
874#[allow(clippy::too_many_arguments)]
875async fn drive_runtime(
876    ublk: &mut SmooUblk,
877    runtime: &mut RuntimeState,
878    link: &mut LinkController,
879    outstanding: &mut HashMap<u32, HashMap<(u16, u16), OutstandingRequest>>,
880    queue_tx: Option<&QueueSender>,
881    allow_reconcile: bool,
882) -> Result<()> {
883    let deadline = Instant::now() + Duration::from_millis(MAINTENANCE_SLICE_MS);
884    link.tick(Instant::now());
885    process_link_commands(runtime, link).await?;
886    ensure_data_plane(runtime).await;
887    if let Some(tx) = queue_tx {
888        sync_queue_tasks(runtime, tx).await;
889    }
890    drain_outstanding_bounded(runtime, link, outstanding, deadline).await?;
891    if allow_reconcile {
892        run_reconcile_slice(ublk, runtime, deadline).await?;
893    }
894    let active_count = count_active_exports(&runtime.exports);
895    runtime.status().set_export_count(active_count).await;
896    Ok(())
897}
898
899async fn handle_config_message(
900    ublk: &mut SmooUblk,
901    runtime: &mut RuntimeState,
902    link: &mut LinkController,
903    outstanding: &mut HashMap<u32, HashMap<(u16, u16), OutstandingRequest>>,
904    config: ConfigExportsV0,
905) -> Result<()> {
906    apply_config(ublk, runtime, config).await?;
907    prune_outstanding_for_missing_exports(outstanding, &runtime.exports);
908    process_link_commands(runtime, link).await?;
909    Ok(())
910}
911
912async fn stop_accepting_new_io(runtime: &mut RuntimeState, queue_tx: &mut Option<QueueSender>) {
913    stop_all_queue_tasks(runtime).await;
914    *queue_tx = None;
915}
916
917enum ShutdownState {
918    Running,
919    Graceful { deadline: Instant },
920    Forceful,
921}
922
923async fn run_event_loop(
924    ublk: &mut SmooUblk,
925    mut runtime: RuntimeState,
926    mut control_rx: mpsc::Receiver<ConfigExportsV0>,
927    mut link: LinkController,
928    ep0_signals: Ep0Signals,
929    control_stop: watch::Sender<bool>,
930) -> Result<()> {
931    let mut shutdown = Some(Box::pin(signal::ctrl_c()));
932    let mut hup = unix_signal(SignalKind::hangup()).context("install SIGHUP handler")?;
933    let idle_sleep = tokio::time::sleep(Duration::from_millis(IDLE_INTERVAL_MS));
934    tokio::pin!(idle_sleep);
935    let mut liveness_tick = tokio::time::interval(Duration::from_millis(LIVENESS_INTERVAL_MS));
936    liveness_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
937
938    let mut outstanding: HashMap<u32, HashMap<(u16, u16), OutstandingRequest>> = HashMap::new();
939    let (queue_tx_init, mut queue_rx) = mpsc::channel::<QueueEvent>(QUEUE_CHANNEL_DEPTH);
940    let mut queue_tx: Option<QueueSender> = Some(queue_tx_init);
941    let (data_plane_tx, mut data_plane_rx) = mpsc::unbounded_channel::<DataPlaneEvent>();
942    let ep0_notify = ep0_signals.notifier();
943
944    let mut io_error = None;
945    let mut exit_reason: Option<String> = None;
946    let mut recovery_exit = false;
947    let mut shutdown_state = ShutdownState::Running;
948    let mut last_status_seq = ep0_signals.status_seq();
949    let mut last_lifecycle_seq = ep0_signals.lifecycle_seq();
950
951    loop {
952        idle_sleep
953            .as_mut()
954            .reset(tokio::time::Instant::now() + Duration::from_millis(IDLE_INTERVAL_MS));
955
956        drain_ep0_signals(
957            &ep0_signals,
958            &mut last_status_seq,
959            &mut last_lifecycle_seq,
960            &mut link,
961        )
962        .await;
963        process_link_commands(&mut runtime, &mut link).await?;
964        ensure_data_plane(&mut runtime).await;
965
966        if runtime
967            .io_pump_task
968            .as_ref()
969            .is_some_and(|task| task.is_finished())
970        {
971            if let Some(task) = runtime.io_pump_task.take() {
972                let _ = task.await;
973            }
974            warn!(
975                epoch = runtime.data_plane_epoch,
976                "io pump task exited unexpectedly; notifying data plane"
977            );
978            notify_data_plane_error(
979                &data_plane_tx,
980                runtime.data_plane_epoch,
981                io::Error::other("io pump exited"),
982            );
983        }
984
985        if let ShutdownState::Graceful { deadline } = shutdown_state {
986            if Instant::now() >= deadline {
987                warn!("graceful shutdown timed out; forcing shutdown");
988                shutdown_state = ShutdownState::Forceful;
989            }
990        }
991
992        if matches!(shutdown_state, ShutdownState::Forceful) {
993            note_exit_reason(&mut exit_reason, "forceful shutdown state entered");
994            break;
995        }
996
997        let ep0_notified = ep0_notify.notified();
998        tokio::pin!(ep0_notified);
999        tokio::select! { biased;
1000            _ = async {
1001                if let Some(fut) = shutdown.as_mut() {
1002                    let _ = fut.as_mut().await;
1003                }
1004            }, if shutdown.is_some() => {
1005                shutdown = None;
1006                match shutdown_state {
1007                    ShutdownState::Running => {
1008                        info!("shutdown signal received; entering graceful shutdown");
1009                        shutdown_state = ShutdownState::Graceful {
1010                            deadline: Instant::now() + Duration::from_millis(GRACEFUL_SHUTDOWN_TIMEOUT_MS),
1011                        };
1012                        stop_accepting_new_io(&mut runtime, &mut queue_tx).await;
1013                        let _ = control_stop.send(true);
1014                    }
1015                    ShutdownState::Graceful { .. } => {
1016                        warn!("second shutdown signal; forcing shutdown");
1017                        note_exit_reason(&mut exit_reason, "second shutdown signal received");
1018                        shutdown_state = ShutdownState::Forceful;
1019                        break;
1020                    }
1021                    ShutdownState::Forceful => break,
1022                }
1023            }
1024            Some(_) = hup.recv() => {
1025                info!("SIGHUP received; initiating user recovery");
1026                note_exit_reason(&mut exit_reason, "SIGHUP received; entering user recovery");
1027                let _ = control_stop.send(true);
1028                begin_user_recovery(ublk, &mut runtime).await?;
1029                recovery_exit = true;
1030                break;
1031            }
1032            event = data_plane_rx.recv() => {
1033                if let Some(event) = event {
1034                    if let Err(err) = handle_data_plane_event(
1035                        &mut runtime,
1036                        &mut link,
1037                        event,
1038                    )
1039                    .await
1040                    {
1041                        warn!(error = ?err, "run_event_loop: data plane event handling failed");
1042                        note_exit_reason(
1043                            &mut exit_reason,
1044                            format!("data plane event handling failed: {err:#}"),
1045                        );
1046                        io_error = Some(err);
1047                        break;
1048                    }
1049                }
1050            }
1051            Some(config) = control_rx.recv(), if matches!(shutdown_state, ShutdownState::Running) => {
1052                if let Err(err) = handle_config_message(
1053                    ublk,
1054                    &mut runtime,
1055                    &mut link,
1056                    &mut outstanding,
1057                    config,
1058                )
1059                .await
1060                {
1061                    warn!(error = ?err, "CONFIG_EXPORTS application failed");
1062                }
1063            }
1064            _ = ep0_notified.as_mut() => {
1065                continue;
1066            }
1067            maybe_evt = queue_rx.recv(), if !matches!(shutdown_state, ShutdownState::Forceful) && runtime.io_pump.is_some() => {
1068                if let Some(evt) = maybe_evt {
1069                    if let Err(err) = handle_queue_event(
1070                        &mut runtime,
1071                        &mut link,
1072                        &mut outstanding,
1073                        &data_plane_tx,
1074                        evt,
1075                    )
1076                    .await
1077                    {
1078                        warn!(error = ?err, "run_event_loop: queue event handling failed");
1079                        note_exit_reason(
1080                            &mut exit_reason,
1081                            format!("queue event handling failed: {err:#}"),
1082                        );
1083                        io_error = Some(err);
1084                        break;
1085                    }
1086                    if let Err(err) = drain_queue_batch(
1087                        &mut runtime,
1088                        &mut link,
1089                        &mut outstanding,
1090                        &data_plane_tx,
1091                        &mut queue_rx,
1092                    )
1093                    .await
1094                    {
1095                        warn!(error = ?err, "run_event_loop: queue batch drain failed");
1096                        note_exit_reason(
1097                            &mut exit_reason,
1098                            format!("queue batch drain failed: {err:#}"),
1099                        );
1100                        io_error = Some(err);
1101                        break;
1102                    }
1103                    if let Err(err) = drive_runtime(
1104                        ublk,
1105                        &mut runtime,
1106                        &mut link,
1107                        &mut outstanding,
1108                        queue_tx.as_ref(),
1109                        false,
1110                    ).await {
1111                        warn!(error = ?err, "run_event_loop: drive_runtime failed after queue events");
1112                        note_exit_reason(
1113                            &mut exit_reason,
1114                            format!("drive_runtime after queue events failed: {err:#}"),
1115                        );
1116                        io_error = Some(err);
1117                        break;
1118                    }
1119                }
1120            }
1121            _ = liveness_tick.tick() => {
1122                if let Err(err) = drive_runtime(
1123                    ublk,
1124                    &mut runtime,
1125                    &mut link,
1126                    &mut outstanding,
1127                    queue_tx.as_ref(),
1128                    false,
1129                ).await {
1130                    warn!(error = ?err, "run_event_loop: drive_runtime failed on liveness tick");
1131                    note_exit_reason(
1132                        &mut exit_reason,
1133                        format!("drive_runtime on liveness tick failed: {err:#}"),
1134                    );
1135                    io_error = Some(err);
1136                    break;
1137                }
1138            }
1139            _ = &mut idle_sleep => {
1140                let allow_reconcile = matches!(shutdown_state, ShutdownState::Running);
1141                if let Err(err) = drive_runtime(
1142                    ublk,
1143                    &mut runtime,
1144                    &mut link,
1145                    &mut outstanding,
1146                    queue_tx.as_ref(),
1147                    allow_reconcile,
1148                ).await {
1149                    warn!(error = ?err, "run_event_loop: drive_runtime failed on idle maintenance");
1150                    note_exit_reason(
1151                        &mut exit_reason,
1152                        format!("drive_runtime on idle maintenance failed: {err:#}"),
1153                    );
1154                    io_error = Some(err);
1155                    break;
1156                }
1157            }
1158        }
1159
1160        if let ShutdownState::Graceful { deadline } = shutdown_state {
1161            if let Err(err) = drive_runtime(
1162                ublk,
1163                &mut runtime,
1164                &mut link,
1165                &mut outstanding,
1166                queue_tx.as_ref(),
1167                false,
1168            )
1169            .await
1170            {
1171                warn!(error = ?err, "run_event_loop: drive_runtime failed during graceful shutdown");
1172                note_exit_reason(
1173                    &mut exit_reason,
1174                    format!("drive_runtime during graceful shutdown failed: {err:#}"),
1175                );
1176                io_error = Some(err);
1177                break;
1178            }
1179            let outstanding_empty = outstanding.is_empty();
1180            let queue_drained = queue_rx.is_closed() && queue_rx.is_empty();
1181            if outstanding_empty && queue_drained {
1182                note_exit_reason(&mut exit_reason, "graceful shutdown complete");
1183                info!("graceful shutdown complete; exiting");
1184                break;
1185            }
1186            if Instant::now() >= deadline {
1187                warn!("graceful shutdown deadline reached; forcing shutdown");
1188                note_exit_reason(&mut exit_reason, "graceful shutdown deadline reached");
1189                shutdown_state = ShutdownState::Forceful;
1190                shutdown = None;
1191            }
1192        }
1193    }
1194
1195    if let Some(reason) = exit_reason.as_deref() {
1196        warn!(%reason, recovery_exit, "event loop exiting");
1197    } else {
1198        info!(recovery_exit, "event loop exiting");
1199    }
1200
1201    if let Some(pump) = runtime.io_pump.take() {
1202        drop(pump);
1203    }
1204    if let Some(task) = runtime.io_pump_task.take() {
1205        task.abort();
1206        let _ = task.await;
1207    }
1208
1209    if recovery_exit {
1210        return Ok(());
1211    }
1212
1213    let _ = control_stop.send(true);
1214    let forceful = matches!(shutdown_state, ShutdownState::Forceful);
1215    info!(
1216        shutdown_reason = exit_reason.as_deref().unwrap_or("unspecified"),
1217        forceful, "cleaning up ublk devices"
1218    );
1219    cleanup_ublk_devices(ublk, &mut runtime, exit_reason.as_deref(), forceful).await?;
1220    runtime.status().set_export_count(0).await;
1221
1222    if let Err(err) = runtime.state_store().remove_file() {
1223        warn!(error = ?err, "failed to remove state file on shutdown");
1224    } else {
1225        debug!("state file removed on shutdown");
1226    }
1227
1228    if let Some(err) = io_error {
1229        Err(err)
1230    } else {
1231        Ok(())
1232    }
1233}
1234
1235async fn handle_request(
1236    pump: IoPumpHandle,
1237    export_id: u32,
1238    queues: Arc<UblkQueueRuntime>,
1239    req: UblkIoRequest,
1240) -> Result<()> {
1241    let block_size = queues.block_size();
1242    let req_len = match request_byte_len(&req, block_size) {
1243        Ok(len) => len,
1244        Err(err) => {
1245            let errno = errno_from_io(&err);
1246            warn!(
1247                queue = req.queue_id,
1248                tag = req.tag,
1249                errno = errno,
1250                ?req.op,
1251                "invalid request length: {err}"
1252            );
1253            queues
1254                .complete_io(req, -errno)
1255                .context("complete invalid request")?;
1256            return Ok(());
1257        }
1258    };
1259
1260    let opcode = match opcode_from_ublk(req.op) {
1261        Some(op) => op,
1262        None => {
1263            warn!(
1264                queue = req.queue_id,
1265                tag = req.tag,
1266                op = ?req.op,
1267                "unsupported ublk opcode"
1268            );
1269            queues
1270                .complete_io(req, -libc::EOPNOTSUPP)
1271                .context("complete unsupported opcode")?;
1272            return Ok(());
1273        }
1274    };
1275
1276    trace!(
1277        export_id,
1278        dev_id = queues.dev_id(),
1279        queue = req.queue_id,
1280        tag = req.tag,
1281        op = ?req.op,
1282        req_bytes = req_len,
1283        block_size,
1284        "handle_request begin"
1285    );
1286
1287    if matches!(opcode, OpCode::Read | OpCode::Write) && req_len > 0 {
1288        let capacity = queues.buffer_len();
1289        if req_len > capacity {
1290            warn!(
1291                queue = req.queue_id,
1292                tag = req.tag,
1293                req_bytes = req_len,
1294                buf_cap = capacity,
1295                "request exceeds buffer capacity"
1296            );
1297            queues
1298                .complete_io(req, -libc::EINVAL)
1299                .context("complete oversized request")?;
1300            return Ok(());
1301        }
1302    }
1303
1304    let num_blocks = u32::try_from(req_len / block_size)
1305        .context("request block count exceeds protocol limit")?;
1306    let request_id = make_request_id(req.queue_id, req.tag);
1307    let proto_req = Request::new(export_id, request_id, opcode, req.sector, num_blocks, 0);
1308    trace!(
1309        export_id,
1310        dev_id = queues.dev_id(),
1311        queue = req.queue_id,
1312        tag = req.tag,
1313        op = ?opcode,
1314        num_blocks,
1315        req_bytes = req_len,
1316        "dispatching smoo Request through pump"
1317    );
1318    let work = IoWork {
1319        ublk_request: req,
1320        request: proto_req,
1321        req_len,
1322        block_size,
1323        queue_id: req.queue_id,
1324        tag: req.tag,
1325        op: opcode,
1326        queues: queues.clone(),
1327    };
1328    pump.submit(work).await
1329}
1330
1331async fn control_loop(
1332    mut custom: Custom,
1333    handler: GadgetControl,
1334    status: GadgetStatusShared,
1335    signals: Ep0Signals,
1336    mut stop: watch::Receiver<bool>,
1337    tx: mpsc::Sender<ConfigExportsV0>,
1338) -> Result<()> {
1339    loop {
1340        tokio::select! {
1341            _ = stop.changed() => {
1342                debug!("control loop stopping on shutdown signal");
1343                return Ok(());
1344            }
1345            result = custom.wait_event() => {
1346                result.context("wait for FunctionFS event")?;
1347            }
1348        }
1349        let event = custom.event().context("read FunctionFS event")?;
1350        match event {
1351            usb_gadget::function::custom::Event::Bind => {
1352                debug!("FunctionFS bind event (control loop)");
1353                signals.push_lifecycle(Event::Bind).await;
1354            }
1355            usb_gadget::function::custom::Event::Unbind => {
1356                debug!("FunctionFS unbind event (control loop)");
1357                signals.push_lifecycle(Event::Unbind).await;
1358            }
1359            usb_gadget::function::custom::Event::Enable => {
1360                debug!("FunctionFS enable event (control loop)");
1361                signals.push_lifecycle(Event::Enable).await;
1362            }
1363            usb_gadget::function::custom::Event::Disable => {
1364                debug!("FunctionFS disable event (control loop)");
1365                signals.push_lifecycle(Event::Disable).await;
1366            }
1367            usb_gadget::function::custom::Event::Suspend => {
1368                debug!("FunctionFS suspend event (control loop)");
1369                signals.push_lifecycle(Event::Suspend).await;
1370            }
1371            usb_gadget::function::custom::Event::Resume => {
1372                debug!("FunctionFS resume event (control loop)");
1373                signals.push_lifecycle(Event::Resume).await;
1374            }
1375            usb_gadget::function::custom::Event::SetupDeviceToHost(sender) => {
1376                let report = status.report().await;
1377                let setup = setup_from_ctrl_req(sender.ctrl_req());
1378                let mut io = UsbControlIo::from_sender(sender);
1379                if let Err(err) = handler.handle_setup_packet(&mut io, setup, &report).await {
1380                    warn!(error = ?err, "vendor setup handling failed");
1381                    let _ = io.stall().await;
1382                } else if is_status_setup(&setup) {
1383                    signals.mark_status_ping();
1384                }
1385            }
1386            usb_gadget::function::custom::Event::SetupHostToDevice(receiver) => {
1387                let report = status.report().await;
1388                let setup = setup_from_ctrl_req(receiver.ctrl_req());
1389                let mut io = UsbControlIo::from_receiver(receiver);
1390                match handler.handle_setup_packet(&mut io, setup, &report).await {
1391                    Ok(Some(SetupCommand::Config(payload))) => match tx.try_send(payload) {
1392                        Ok(()) => {}
1393                        Err(TrySendError::Closed(_)) => {
1394                            warn!("CONFIG_EXPORTS channel closed; dropping payload");
1395                        }
1396                        Err(TrySendError::Full(_)) => {
1397                            warn!("CONFIG_EXPORTS channel full; dropping payload");
1398                        }
1399                    },
1400                    Ok(None) => {
1401                        if is_status_setup(&setup) {
1402                            signals.mark_status_ping();
1403                        }
1404                    }
1405                    Err(err) => {
1406                        warn!(error = ?err, "vendor setup handling failed");
1407                        let _ = io.stall().await;
1408                    }
1409                }
1410            }
1411            usb_gadget::function::custom::Event::Unknown(code) => {
1412                debug!(event = code, "FunctionFS unknown event");
1413            }
1414            _ => {}
1415        }
1416    }
1417}
1418
1419fn opcode_from_ublk(op: UblkOp) -> Option<OpCode> {
1420    match op {
1421        UblkOp::Read => Some(OpCode::Read),
1422        UblkOp::Write => Some(OpCode::Write),
1423        UblkOp::Flush => Some(OpCode::Flush),
1424        UblkOp::Discard => Some(OpCode::Discard),
1425        UblkOp::Unknown(_) => None,
1426    }
1427}
1428
1429fn make_request_id(queue_id: u16, tag: u16) -> u32 {
1430    ((queue_id as u32) << 16) | tag as u32
1431}
1432
1433fn request_byte_len(req: &UblkIoRequest, block_size: usize) -> io::Result<usize> {
1434    let sectors = usize::try_from(req.num_sectors)
1435        .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "sector count overflow"))?;
1436    sectors
1437        .checked_mul(block_size)
1438        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "request byte length overflow"))
1439}
1440
1441fn errno_from_io(err: &io::Error) -> i32 {
1442    err.raw_os_error().unwrap_or_else(|| match err.kind() {
1443        io::ErrorKind::Unsupported => libc::EOPNOTSUPP,
1444        io::ErrorKind::PermissionDenied => libc::EACCES,
1445        io::ErrorKind::UnexpectedEof => libc::EIO,
1446        io::ErrorKind::NotFound => libc::ENOENT,
1447        io::ErrorKind::InvalidInput => libc::EINVAL,
1448        _ => libc::EIO,
1449    })
1450}
1451
1452fn setup_from_ctrl_req(ctrl: &CtrlReq) -> SetupPacket {
1453    SetupPacket::from_fields(
1454        ctrl.request_type,
1455        ctrl.request,
1456        ctrl.value,
1457        ctrl.index,
1458        ctrl.length,
1459    )
1460}
1461
1462fn is_status_setup(setup: &SetupPacket) -> bool {
1463    setup.request() == SMOO_STATUS_REQUEST && setup.request_type() == SMOO_STATUS_REQ_TYPE
1464}
1465
1466enum UsbControlInner<'a> {
1467    In(Option<CtrlSender<'a>>),
1468    Out(Option<CtrlReceiver<'a>>),
1469}
1470
1471struct UsbControlIo<'a> {
1472    inner: UsbControlInner<'a>,
1473}
1474
1475impl<'a> UsbControlIo<'a> {
1476    fn from_sender(sender: CtrlSender<'a>) -> Self {
1477        Self {
1478            inner: UsbControlInner::In(Some(sender)),
1479        }
1480    }
1481
1482    fn from_receiver(receiver: CtrlReceiver<'a>) -> Self {
1483        Self {
1484            inner: UsbControlInner::Out(Some(receiver)),
1485        }
1486    }
1487}
1488
1489#[async_trait::async_trait]
1490impl ControlIo for UsbControlIo<'_> {
1491    async fn write_in(&mut self, data: &[u8]) -> Result<()> {
1492        match &mut self.inner {
1493            UsbControlInner::In(sender) => {
1494                let sender = sender.take().context("control sender already used")?;
1495                sender
1496                    .send(data)
1497                    .with_context(|| format!("send control response of {} bytes", data.len()))
1498                    .map(|_| ())
1499            }
1500            UsbControlInner::Out(_) => Ok(()),
1501        }
1502    }
1503
1504    async fn read_out(&mut self, buf: &mut [u8]) -> Result<()> {
1505        match &mut self.inner {
1506            UsbControlInner::Out(receiver) => {
1507                let receiver = receiver.take().context("control receiver already used")?;
1508                let read = receiver
1509                    .recv(buf)
1510                    .with_context(|| format!("read control payload of {} bytes", buf.len()))?;
1511                ensure!(read == buf.len(), "control payload truncated");
1512                Ok(())
1513            }
1514            UsbControlInner::In(_) => Err(anyhow!("attempted to read_out on IN control transfer")),
1515        }
1516    }
1517
1518    async fn stall(&mut self) -> Result<()> {
1519        match &mut self.inner {
1520            UsbControlInner::In(sender) => {
1521                let sender = sender.take().context("control sender already used")?;
1522                sender.halt().context("stall control sender")
1523            }
1524            UsbControlInner::Out(receiver) => {
1525                let receiver = receiver.take().context("control receiver already used")?;
1526                receiver.halt().context("stall control receiver")
1527            }
1528        }
1529    }
1530}
1531async fn initialize_session(_ublk: &mut SmooUblk, state_store: &mut StateStore) -> Result<()> {
1532    if state_store.records().is_empty() {
1533        if state_store.path().is_some() {
1534            debug!("state file present but no exports recorded; nothing to recover");
1535        }
1536        return Ok(());
1537    }
1538
1539    let mut seen = HashSet::new();
1540    let mut reset = false;
1541    for record in state_store.records() {
1542        if !seen.insert(record.export_id) {
1543            warn!(
1544                export_id = record.export_id,
1545                "state file contains duplicate export_id; clearing state"
1546            );
1547            reset = true;
1548            break;
1549        }
1550        if let Err(err) = validate_persisted_record(record) {
1551            warn!(
1552                export_id = record.export_id,
1553                error = ?err,
1554                "state file entry invalid; clearing state"
1555            );
1556            reset = true;
1557            break;
1558        }
1559    }
1560
1561    if reset {
1562        reset_state_store(state_store);
1563        let _ = state_store.persist();
1564    }
1565    Ok(())
1566}
1567
1568async fn adopt_prepare(ublk: &mut SmooUblk, state_store: &mut StateStore) -> Result<()> {
1569    let mut dev_ids = Vec::new();
1570    let mut owner_pids = HashSet::new();
1571    let mut stale_devices = false;
1572    for record in state_store.records() {
1573        if let Some(dev_id) = record.assigned_dev_id {
1574            dev_ids.push(dev_id);
1575            match ublk.owner_pid(dev_id).await {
1576                Ok(pid) => {
1577                    let alive = pid_is_alive(pid);
1578                    debug!(dev_id, pid, alive, "queried ublk owner");
1579                    if pid > 0 && pid != unsafe { libc::getpid() } && alive {
1580                        owner_pids.insert(pid);
1581                    } else if pid > 0 && !alive {
1582                        stale_devices = true;
1583                    }
1584                }
1585                Err(err) => {
1586                    let missing = error_is_missing(&err);
1587                    warn!(dev_id, error = ?err, missing, "query owner pid failed");
1588                    if missing {
1589                        stale_devices = true;
1590                    }
1591                }
1592            }
1593        }
1594    }
1595
1596    if stale_devices && owner_pids.is_empty() {
1597        warn!("no surviving owners and stale devices detected; resetting state for fresh session");
1598        reset_state_store(state_store);
1599        if let Err(err) = state_store.persist() {
1600            warn!(error = ?err, "persist state reset failed");
1601        }
1602        return Ok(());
1603    }
1604
1605    if owner_pids.len() > 1 {
1606        warn!(
1607            owners = ?owner_pids,
1608            "multiple ublk owners detected; resetting state for clean session"
1609        );
1610        reset_state_store(state_store);
1611        if let Err(err) = state_store.persist() {
1612            warn!(error = ?err, "persist state reset failed");
1613        }
1614        anyhow::bail!("multiple ublk owners detected during adopt");
1615    }
1616
1617    if let Some(pid) = owner_pids.into_iter().next() {
1618        info!(pid, "signaling existing smoo-gadget owner for recovery");
1619        unsafe {
1620            libc::kill(pid, libc::SIGHUP);
1621        }
1622        info!(pid, "waiting for prior owner to exit before adopting");
1623        wait_for_owner_exit(ublk, &dev_ids, pid, Duration::from_secs(3)).await?;
1624    }
1625
1626    Ok(())
1627}
1628
1629async fn wait_for_owner_exit(
1630    ublk: &mut SmooUblk,
1631    dev_ids: &[u32],
1632    target_pid: i32,
1633    timeout: Duration,
1634) -> Result<()> {
1635    let deadline = Instant::now() + timeout;
1636    loop {
1637        let mut still_owned = false;
1638        for dev_id in dev_ids {
1639            match ublk.owner_pid(*dev_id).await {
1640                Ok(pid) => {
1641                    debug!(dev_id, pid, target_pid, "owner check during adopt wait");
1642                    if pid == target_pid {
1643                        still_owned = true;
1644                    } else if pid > 0 && pid != target_pid {
1645                        anyhow::bail!(
1646                            "device {dev_id} now owned by unexpected pid {pid} during adopt"
1647                        );
1648                    }
1649                }
1650                Err(err) => {
1651                    warn!(dev_id, error = ?err, "owner pid query failed during adopt wait");
1652                }
1653            }
1654        }
1655        if !still_owned {
1656            return Ok(());
1657        }
1658        if Instant::now() >= deadline {
1659            anyhow::bail!("owner pid {target_pid} still active after adopt wait");
1660        }
1661        tokio::time::sleep(Duration::from_millis(100)).await;
1662    }
1663}
1664
1665fn reset_state_store(state_store: &mut StateStore) {
1666    let path = state_store.path().map(Path::to_path_buf);
1667    *state_store = match path {
1668        Some(path) => StateStore::new_with_path(path),
1669        None => StateStore::new(),
1670    };
1671}
1672
1673fn count_active_exports(exports: &HashMap<u32, ExportController>) -> u32 {
1674    exports
1675        .values()
1676        .filter(|ctrl| ctrl.is_active_for_status())
1677        .count() as u32
1678}
1679
1680async fn apply_config(
1681    ublk: &mut SmooUblk,
1682    runtime: &mut RuntimeState,
1683    config: ConfigExportsV0,
1684) -> Result<()> {
1685    let entries = config.entries();
1686    let desired_records = if entries.is_empty() {
1687        Vec::new()
1688    } else {
1689        config_entries_to_records(entries)?
1690    };
1691
1692    // Fast-path: zero exports means tear everything down.
1693    if desired_records.is_empty() {
1694        for controller in runtime.exports.values_mut() {
1695            if let Some((ctrl, queues)) = controller.take_device_handles() {
1696                ublk.stop_dev(SmooUblkDevice::from_parts(ctrl, queues), true)
1697                    .await
1698                    .context("stop ublk device before applying CONFIG_EXPORTS")?;
1699            }
1700        }
1701        runtime.exports.clear();
1702        runtime.reconcile_queue.clear();
1703        runtime.state_store().replace_all(Vec::new());
1704        if let Err(err) = runtime.state_store().persist() {
1705            warn!(error = ?err, "failed to clear state file");
1706        }
1707        runtime.status().set_export_count(0).await;
1708        return Ok(());
1709    }
1710
1711    let desired_specs: HashMap<u32, ExportSpec> = desired_records
1712        .iter()
1713        .map(|record| (record.export_id, record.spec.clone()))
1714        .collect();
1715
1716    // Stop and remove exports that are missing or whose geometry changed.
1717    let mut to_remove = Vec::new();
1718    for (export_id, controller) in runtime.exports.iter() {
1719        match desired_specs.get(export_id) {
1720            Some(spec) if spec == &controller.spec => {}
1721            _ => to_remove.push(*export_id),
1722        }
1723    }
1724    for export_id in to_remove {
1725        if let Some(mut controller) = runtime.exports.remove(&export_id) {
1726            if let Some((ctrl, queues)) = controller.take_device_handles() {
1727                ublk.stop_dev(SmooUblkDevice::from_parts(ctrl, queues), true)
1728                    .await
1729                    .with_context(|| format!("stop ublk device for export {export_id}"))?;
1730            }
1731        }
1732    }
1733
1734    // Create controllers for any new exports.
1735    for record in &desired_records {
1736        runtime.exports.entry(record.export_id).or_insert_with(|| {
1737            ExportController::new(record.export_id, record.spec.clone(), ExportState::New)
1738        });
1739    }
1740
1741    // Rebuild state store with the desired exports, keeping any assigned dev_ids
1742    // for controllers we kept alive.
1743    let mut new_records = Vec::with_capacity(desired_records.len());
1744    for mut record in desired_records {
1745        if let Some(ctrl) = runtime.exports.get(&record.export_id) {
1746            record.assigned_dev_id = ctrl.dev_id();
1747        }
1748        new_records.push(record);
1749    }
1750
1751    runtime.state_store().replace_all(new_records);
1752    if let Err(err) = runtime.state_store().persist() {
1753        warn!(error = ?err, "failed to write state store");
1754    }
1755    runtime
1756        .reconcile_queue
1757        .retain(|export_id| runtime.exports.contains_key(export_id));
1758    runtime
1759        .status()
1760        .set_export_count(count_active_exports(&runtime.exports))
1761        .await;
1762    Ok(())
1763}
1764
1765struct GadgetGuard {
1766    _registration: RegGadget,
1767}
1768
1769fn setup_configfs(
1770    args: &Args,
1771) -> Result<(Custom, FunctionfsEndpoints, Option<GadgetGuard>, PathBuf)> {
1772    if let Some(ffs_dir) = args.ffs_dir.as_ref() {
1773        info!(
1774            ffs_dir = %ffs_dir.display(),
1775            "using existing FunctionFS directory; skipping configfs setup"
1776        );
1777        let custom = configfs_builder(args)
1778            .existing(ffs_dir)
1779            .context("initialize FunctionFS in existing directory")?;
1780        let endpoints = open_data_endpoints(ffs_dir)?;
1781        return Ok((custom, endpoints, None, ffs_dir.clone()));
1782    }
1783
1784    usb_gadget::remove_all().context("remove existing USB gadgets")?;
1785    let (mut custom, handle) = configfs_builder(args).build();
1786
1787    let (subclass, protocol) = interface_identity(args);
1788    let klass = Class::new(SMOO_CLASS, subclass, protocol);
1789    let id = Id::new(args.vendor_id, args.product_id);
1790    let strings = Strings::new("smoo", "smoo gadget", "0001");
1791    let udc = usb_gadget::default_udc().context("locate UDC")?;
1792    let gadget =
1793        Gadget::new(klass, id, strings).with_config(Config::new("config").with_function(handle));
1794    let reg = gadget.register().context("register gadget")?;
1795
1796    let ffs_dir = custom.ffs_dir().context("resolve FunctionFS dir")?;
1797    reg.bind(Some(&udc)).context("bind gadget to UDC")?;
1798
1799    let endpoints = open_data_endpoints(&ffs_dir)?;
1800
1801    Ok((
1802        custom,
1803        endpoints,
1804        Some(GadgetGuard { _registration: reg }),
1805        ffs_dir,
1806    ))
1807}
1808
1809fn configfs_builder(args: &Args) -> CustomBuilder {
1810    let (subclass, protocol) = interface_identity(args);
1811    Custom::builder().with_interface(
1812        Interface::new(Class::vendor_specific(subclass, protocol), "smoo")
1813            .with_endpoint(interrupt_in_ep())
1814            .with_endpoint(interrupt_out_ep())
1815            .with_endpoint(bulk_in_ep())
1816            .with_endpoint(bulk_out_ep()),
1817    )
1818}
1819
1820fn interface_identity(args: &Args) -> (u8, u8) {
1821    if args.mimic_fastboot {
1822        (FASTBOOT_SUBCLASS, FASTBOOT_PROTOCOL)
1823    } else {
1824        (SMOO_SUBCLASS, SMOO_PROTOCOL)
1825    }
1826}
1827
1828fn open_data_endpoints(ffs_dir: &Path) -> Result<FunctionfsEndpoints> {
1829    let interrupt_in = open_endpoint_fd(ffs_dir.join("ep1")).context("open interrupt IN")?;
1830    let interrupt_out = open_endpoint_fd(ffs_dir.join("ep2")).context("open interrupt OUT")?;
1831    let bulk_in = open_endpoint_fd(ffs_dir.join("ep3")).context("open bulk IN")?;
1832    let bulk_out = open_endpoint_fd(ffs_dir.join("ep4")).context("open bulk OUT")?;
1833    Ok(FunctionfsEndpoints::new(
1834        interrupt_in,
1835        interrupt_out,
1836        bulk_in,
1837        bulk_out,
1838    ))
1839}
1840
1841fn interrupt_in_ep() -> Endpoint {
1842    let (_, dir) = EndpointDirection::device_to_host();
1843    make_ep(dir, TransferType::Interrupt, 1024)
1844}
1845
1846fn interrupt_out_ep() -> Endpoint {
1847    let (_, dir) = EndpointDirection::host_to_device();
1848    make_ep(dir, TransferType::Interrupt, 1024)
1849}
1850
1851fn bulk_in_ep() -> Endpoint {
1852    let (_, dir) = EndpointDirection::device_to_host();
1853    make_ep(dir, TransferType::Bulk, 512)
1854}
1855
1856fn bulk_out_ep() -> Endpoint {
1857    let (_, dir) = EndpointDirection::host_to_device();
1858    make_ep(dir, TransferType::Bulk, 512)
1859}
1860
1861fn make_ep(direction: EndpointDirection, ty: TransferType, packet_size: u16) -> Endpoint {
1862    let mut ep = match ty {
1863        TransferType::Bulk => Endpoint::bulk(direction),
1864        _ => Endpoint::custom(direction, ty),
1865    };
1866    ep.max_packet_size_hs = packet_size;
1867    ep.max_packet_size_ss = packet_size;
1868    if matches!(ty, TransferType::Interrupt) {
1869        ep.interval = 1;
1870    }
1871    ep
1872}
1873
1874fn open_endpoint_fd(path: PathBuf) -> Result<OwnedFd> {
1875    let file = File::options()
1876        .read(true)
1877        .write(true)
1878        .open(&path)
1879        .with_context(|| format!("open {}", path.display()))?;
1880    Ok(to_owned_fd(file))
1881}
1882
1883fn to_owned_fd(file: File) -> OwnedFd {
1884    let raw = file.into_raw_fd();
1885    unsafe { OwnedFd::from_raw_fd(raw) }
1886}
1887
1888async fn cleanup_ublk_devices(
1889    ublk: &mut SmooUblk,
1890    runtime: &mut RuntimeState,
1891    shutdown_reason: Option<&str>,
1892    forceful: bool,
1893) -> Result<()> {
1894    for (_, tasks) in runtime.queue_tasks.drain() {
1895        if forceful {
1896            tasks.abort();
1897        } else {
1898            tasks.shutdown().await;
1899        }
1900    }
1901    let mut force_remove_ids = Vec::new();
1902    for controller in runtime.exports.values_mut() {
1903        if let Some((ctrl, queues)) = controller.take_device_handles() {
1904            let dev_id = ctrl.dev_id();
1905            if forceful {
1906                info!(
1907                    dev_id,
1908                    shutdown_reason = shutdown_reason.unwrap_or("unspecified"),
1909                    "forceful shutdown: dropping ublk device handles"
1910                );
1911                drop(SmooUblkDevice::from_parts(ctrl, queues));
1912                force_remove_ids.push(dev_id);
1913            } else {
1914                info!(
1915                    dev_id,
1916                    shutdown_reason = shutdown_reason.unwrap_or("unspecified"),
1917                    "stopping ublk device"
1918                );
1919                if let Err(err) = ublk
1920                    .stop_dev(SmooUblkDevice::from_parts(ctrl, queues), true)
1921                    .await
1922                {
1923                    warn!(
1924                        dev_id,
1925                        error = ?err,
1926                        "graceful stop failed; will force-remove"
1927                    );
1928                    force_remove_ids.push(dev_id);
1929                }
1930            }
1931        } else if let Some(dev_id) = controller.dev_id() {
1932            force_remove_ids.push(dev_id);
1933        }
1934    }
1935
1936    for dev_id in force_remove_ids {
1937        force_remove_with_retry(ublk, dev_id).await?;
1938    }
1939    Ok(())
1940}
1941
1942async fn force_remove_with_retry(ublk: &mut SmooUblk, dev_id: u32) -> Result<()> {
1943    let mut attempt: u32 = 0;
1944    loop {
1945        attempt = attempt.wrapping_add(1);
1946        match ublk.force_remove_device(dev_id).await {
1947            Ok(()) => {
1948                info!(dev_id, attempt, "force-removed ublk device");
1949                break;
1950            }
1951            Err(err) => {
1952                if error_is_errno(&err, libc::ENOENT) {
1953                    info!(dev_id, attempt, "ublk device already absent");
1954                    break;
1955                }
1956                warn!(
1957                    dev_id,
1958                    attempt,
1959                    error = ?err,
1960                    "force-remove ublk device failed; retrying"
1961                );
1962            }
1963        }
1964        tokio::time::sleep(Duration::from_millis(250)).await;
1965    }
1966    Ok(())
1967}
1968
1969async fn begin_user_recovery(ublk: &mut SmooUblk, runtime: &mut RuntimeState) -> Result<()> {
1970    ublk.preserve_devices_on_drop();
1971    for (_, tasks) in runtime.queue_tasks.drain() {
1972        tasks.shutdown().await;
1973    }
1974    let mut dev_ids = Vec::new();
1975    for ctrl in runtime.exports.values_mut() {
1976        if let Some((ctrl, queues)) = ctrl.take_device_handles() {
1977            dev_ids.push(ctrl.dev_id());
1978            drop(SmooUblkDevice::from_parts(ctrl, queues));
1979        } else if let Some(dev_id) = ctrl.dev_id() {
1980            dev_ids.push(dev_id);
1981        }
1982    }
1983    for dev_id in dev_ids {
1984        if let Err(err) = ublk.start_user_recovery(dev_id).await {
1985            warn!(dev_id, error = ?err, "start user recovery failed");
1986        }
1987    }
1988    Ok(())
1989}
1990
1991async fn process_link_commands(
1992    runtime: &mut RuntimeState,
1993    link: &mut LinkController,
1994) -> Result<()> {
1995    if let Some(LinkCommand::Fatal) = link.take_command() {
1996        let reason = link.last_offline_reason();
1997        let active_exports = count_active_exports(&runtime.exports);
1998        warn!(
1999            ?reason,
2000            state = ?link.state(),
2001            active_exports,
2002            "link controller emitted fatal command; keeping ublk runtime alive"
2003        );
2004    }
2005    Ok(())
2006}
2007
2008async fn handle_data_plane_event(
2009    runtime: &mut RuntimeState,
2010    link: &mut LinkController,
2011    event: DataPlaneEvent,
2012) -> Result<()> {
2013    match event {
2014        DataPlaneEvent::IoError { epoch, error } => {
2015            if epoch != runtime.data_plane_epoch {
2016                trace!(
2017                    event_epoch = epoch,
2018                    current_epoch = runtime.data_plane_epoch,
2019                    "ignoring stale data plane error"
2020                );
2021                return Ok(());
2022            }
2023            warn!(
2024                error = ?error,
2025                event_epoch = epoch,
2026                current_epoch = runtime.data_plane_epoch,
2027                "data plane I/O error; requesting link offline"
2028            );
2029            link.on_io_error(&error);
2030        }
2031    }
2032    process_link_commands(runtime, link).await?;
2033    Ok(())
2034}
2035
2036async fn handle_queue_event(
2037    runtime: &mut RuntimeState,
2038    link: &mut LinkController,
2039    outstanding: &mut HashMap<u32, HashMap<(u16, u16), OutstandingRequest>>,
2040    _data_plane_tx: &mpsc::UnboundedSender<DataPlaneEvent>,
2041    event: QueueEvent,
2042) -> Result<()> {
2043    match event {
2044        QueueEvent::Request {
2045            export_id,
2046            dev_id,
2047            request,
2048            queues,
2049        } => {
2050            let Some(ctrl) = runtime.exports.get_mut(&export_id) else {
2051                return Ok(());
2052            };
2053            let Some(handle) = ctrl.device_handle() else {
2054                park_request(outstanding, export_id, dev_id, queues.clone(), request);
2055                return Ok(());
2056            };
2057            if handle.dev_id() != dev_id {
2058                trace!(export_id, dev_id, "dropping request for stale device id");
2059                return Ok(());
2060            }
2061            let handle_ready = matches!(
2062                handle,
2063                DeviceHandle::Online { .. } | DeviceHandle::Starting { .. }
2064            );
2065            if !matches!(link.state(), LinkState::Online)
2066                || runtime.gadget.is_none()
2067                || !handle_ready
2068            {
2069                trace!(
2070                    export_id,
2071                    queue = request.queue_id,
2072                    tag = request.tag,
2073                    "link not online; parking IO"
2074                );
2075                park_request(outstanding, export_id, dev_id, queues.clone(), request);
2076                return Ok(());
2077            }
2078            let Some(pump) = runtime.io_pump.as_ref() else {
2079                park_request(outstanding, export_id, dev_id, queues.clone(), request);
2080                return Ok(());
2081            };
2082            trace!(
2083                export_id,
2084                dev_id,
2085                queue = request.queue_id,
2086                tag = request.tag,
2087                op = ?request.op,
2088                sector = request.sector,
2089                num_sectors = request.num_sectors,
2090                "dispatch ublk request to host"
2091            );
2092            if let Err(err) = handle_request(pump.clone(), export_id, queues.clone(), request).await
2093            {
2094                let io_err = io_error_from_anyhow(&err);
2095                link.on_io_error(&io_err);
2096                park_request(outstanding, export_id, dev_id, queues.clone(), request);
2097                warn!(
2098                    export_id,
2099                    dev_id,
2100                    queue = request.queue_id,
2101                    tag = request.tag,
2102                    io_kind = ?io_err.kind(),
2103                    io_errno = io_err.raw_os_error(),
2104                    error = ?err,
2105                    "request dispatch failed; parked and forcing link offline"
2106                );
2107            }
2108        }
2109        QueueEvent::QueueError {
2110            export_id,
2111            dev_id,
2112            error,
2113        } => {
2114            warn!(
2115                export_id,
2116                dev_id,
2117                error = ?error,
2118                "queue task error; marking export failed and forcing link offline"
2119            );
2120            if let Some(ctrl) = runtime.exports.get_mut(&export_id) {
2121                ctrl.fail_device(format!("device {dev_id} queue task error: {error:#}"));
2122            }
2123            if let Some(mut pending) = outstanding.remove(&export_id) {
2124                for ((_queue_id, _tag), req) in pending.drain() {
2125                    let _ = req.queues.complete_io(req.request, -libc::ENOLINK);
2126                }
2127            }
2128            link.on_io_error(&io::Error::other("queue task error"));
2129        }
2130    }
2131    Ok(())
2132}
2133
2134fn park_request(
2135    outstanding: &mut HashMap<u32, HashMap<(u16, u16), OutstandingRequest>>,
2136    export_id: u32,
2137    dev_id: u32,
2138    queues: Arc<UblkQueueRuntime>,
2139    req: UblkIoRequest,
2140) {
2141    let entry = outstanding.entry(export_id).or_default();
2142    entry.insert(
2143        (req.queue_id, req.tag),
2144        OutstandingRequest {
2145            dev_id,
2146            request: req,
2147            queues,
2148        },
2149    );
2150}
2151
2152fn prune_outstanding_for_missing_exports(
2153    outstanding: &mut HashMap<u32, HashMap<(u16, u16), OutstandingRequest>>,
2154    exports: &HashMap<u32, ExportController>,
2155) {
2156    let mut to_fail = Vec::new();
2157    for export_id in outstanding.keys() {
2158        if !exports.contains_key(export_id) {
2159            to_fail.push(*export_id);
2160        }
2161    }
2162    for export_id in to_fail {
2163        if let Some(mut pending) = outstanding.remove(&export_id) {
2164            for ((_queue_id, _tag), req) in pending.drain() {
2165                let _ = req.queues.complete_io(req.request, -libc::ENODEV);
2166            }
2167        }
2168    }
2169}
2170
2171fn io_error_from_anyhow(err: &anyhow::Error) -> io::Error {
2172    if let Some(cause) = err
2173        .chain()
2174        .find_map(|cause| cause.downcast_ref::<io::Error>())
2175    {
2176        io::Error::new(cause.kind(), cause.to_string())
2177    } else {
2178        io::Error::other(err.to_string())
2179    }
2180}
2181
2182fn note_exit_reason(exit_reason: &mut Option<String>, reason: impl Into<String>) {
2183    if exit_reason.is_none() {
2184        *exit_reason = Some(reason.into());
2185    }
2186}
2187
2188fn error_is_errno(err: &anyhow::Error, code: i32) -> bool {
2189    err.chain()
2190        .find_map(|cause| cause.downcast_ref::<std::io::Error>())
2191        .and_then(|io_err| io_err.raw_os_error())
2192        == Some(code)
2193}
2194
2195fn error_is_missing(err: &anyhow::Error) -> bool {
2196    err.chain()
2197        .find_map(|cause| cause.downcast_ref::<std::io::Error>())
2198        .and_then(|io_err| io_err.raw_os_error())
2199        .is_some_and(|code| code == libc::ENOENT || code == libc::EINVAL)
2200}
2201
2202fn pid_is_alive(pid: i32) -> bool {
2203    if pid <= 0 {
2204        return false;
2205    }
2206    let res = unsafe { libc::kill(pid, 0) };
2207    if res == 0 {
2208        return true;
2209    }
2210    let err = std::io::Error::last_os_error();
2211    !matches!(err.raw_os_error(), Some(libc::ESRCH))
2212}
2213
2214fn parse_hex_u16(input: &str) -> Result<u16, String> {
2215    let trimmed = input.trim_start_matches("0x").trim_start_matches("0X");
2216    u16::from_str_radix(trimmed, 16).map_err(|err| err.to_string())
2217}
2218
2219fn validate_persisted_record(record: &PersistedExportRecord) -> Result<()> {
2220    ensure!(
2221        record.export_id != 0,
2222        "persisted export_id must be non-zero"
2223    );
2224    let block_size = record.spec.block_size;
2225    ensure!(
2226        block_size.is_power_of_two(),
2227        "persisted block size must be power-of-two"
2228    );
2229    ensure!(
2230        (512..=65536).contains(&block_size),
2231        "persisted block size out of range"
2232    );
2233    ensure!(
2234        record.spec.size_bytes != 0,
2235        "persisted export size_bytes must be non-zero"
2236    );
2237    ensure!(
2238        record.spec.size_bytes.is_multiple_of(block_size as u64),
2239        "persisted export size_bytes must be multiple of block_size"
2240    );
2241    let blocks = record
2242        .spec
2243        .size_bytes
2244        .checked_div(block_size as u64)
2245        .context("persisted size_bytes smaller than block_size")?;
2246    ensure!(blocks > 0, "persisted export size too small");
2247    usize::try_from(blocks).context("persisted export block count overflows usize")?;
2248    Ok(())
2249}
2250
2251fn config_entries_to_records(entries: &[ConfigExport]) -> Result<Vec<PersistedExportRecord>> {
2252    let mut seen = HashSet::new();
2253    let mut records = Vec::with_capacity(entries.len());
2254    for export in entries {
2255        ensure!(
2256            seen.insert(export.export_id),
2257            "duplicate export_id {} in CONFIG_EXPORTS",
2258            export.export_id
2259        );
2260        let spec = build_spec_from_export(*export)?;
2261        records.push(PersistedExportRecord {
2262            export_id: export.export_id,
2263            spec,
2264            assigned_dev_id: None,
2265        });
2266    }
2267    Ok(records)
2268}
2269
2270fn build_spec_from_export(export: ConfigExport) -> Result<ExportSpec> {
2271    let block_size = export.block_size as usize;
2272    ensure!(
2273        export.size_bytes != 0,
2274        "CONFIG_EXPORTS size_bytes must be non-zero"
2275    );
2276    ensure!(
2277        export.size_bytes.is_multiple_of(block_size as u64),
2278        "CONFIG_EXPORTS size_bytes must be multiple of block_size"
2279    );
2280    let blocks = export
2281        .size_bytes
2282        .checked_div(block_size as u64)
2283        .context("size bytes smaller than block size")?;
2284    ensure!(blocks > 0, "export size too small");
2285    usize::try_from(blocks).context("export block count overflows usize")?;
2286    Ok(ExportSpec {
2287        block_size: export.block_size,
2288        size_bytes: export.size_bytes,
2289        flags: ExportFlags::empty(),
2290    })
2291}