1use anyhow::{anyhow, ensure, Context, Result};
2use clap::{Parser, ValueEnum};
3use hyper::service::{make_service_fn, service_fn};
4use hyper::{Body, Request as HttpRequest, Response as HttpResponse, Server, StatusCode};
5use metrics_exporter_prometheus::PrometheusBuilder;
6use smoo_gadget_core::{
7 ConfigExport, ConfigExportsV0, ControlIo, DeviceHandle, DmaHeap, ExportController, ExportFlags,
8 ExportReconcileContext, ExportSpec, ExportState, FunctionfsEndpoints, GadgetConfig,
9 GadgetControl, GadgetStatusReport, IoPumpHandle, IoWork, LinkCommand, LinkController,
10 LinkState, PersistedExportRecord, RuntimeTunables, SetupCommand, SetupPacket, SmooGadget,
11 SmooUblk, SmooUblkDevice, StateStore, UblkIoRequest, UblkOp, UblkQueueRuntime,
12};
13use smoo_proto::{Ident, OpCode, Request, SMOO_STATUS_REQUEST, SMOO_STATUS_REQ_TYPE};
14use std::{
15 collections::{HashMap, HashSet, VecDeque},
16 convert::Infallible,
17 fs::File,
18 io,
19 net::SocketAddr,
20 os::fd::{FromRawFd, IntoRawFd, OwnedFd},
21 path::{Path, PathBuf},
22 sync::{
23 atomic::{AtomicU64, Ordering},
24 Arc,
25 },
26 time::{Duration, Instant},
27};
28use tokio::{
29 signal,
30 signal::unix::{signal as unix_signal, SignalKind},
31 sync::{
32 mpsc,
33 mpsc::error::{TryRecvError, TrySendError},
34 watch, Mutex, Notify, RwLock,
35 },
36 task::JoinHandle,
37};
38use tokio_util::sync::CancellationToken;
39use tracing::{debug, error, info, trace, warn};
40use usb_gadget::{
41 function::custom::{
42 CtrlReceiver, CtrlReq, CtrlSender, Custom, CustomBuilder, Endpoint, EndpointDirection,
43 Event, Interface, TransferType,
44 },
45 Class, Config, Gadget, Id, RegGadget, Strings,
46};
47
48const SMOO_CLASS: u8 = 0xFF;
49const SMOO_SUBCLASS: u8 = 0x53;
50const SMOO_PROTOCOL: u8 = 0x4D;
51const FASTBOOT_SUBCLASS: u8 = 0x42;
52const FASTBOOT_PROTOCOL: u8 = 0x03;
53const DEFAULT_MAX_IO_BYTES: usize = 4 * 1024 * 1024;
54const CONFIG_CHANNEL_DEPTH: usize = 32;
55const QUEUE_CHANNEL_DEPTH: usize = 128;
56const QUEUE_BATCH_MAX: usize = 32;
57const OUTSTANDING_BATCH_MAX: usize = 32;
58const IDLE_INTERVAL_MS: u64 = 10;
59const LIVENESS_INTERVAL_MS: u64 = 500;
60const MAINTENANCE_SLICE_MS: u64 = 200;
61const RECONCILE_TIMEOUT_MS: u64 = 200;
62const GRACEFUL_SHUTDOWN_TIMEOUT_MS: u64 = 5_000;
63
64#[derive(Debug, Parser)]
65#[command(name = "smoo-gadget", version)]
66#[command(about = "Expose a smoo gadget backed by FunctionFS + ublk", long_about = None)]
67pub struct Args {
68 #[arg(long, value_name = "HEX", default_value = "0xDEAD", value_parser = parse_hex_u16)]
70 pub vendor_id: u16,
71 #[arg(long, value_name = "HEX", default_value = "0xBEEF", value_parser = parse_hex_u16)]
73 pub product_id: u16,
74 #[arg(long, default_value_t = 1)]
76 pub queue_count: u16,
77 #[arg(long, default_value_t = 16)]
79 pub queue_depth: u16,
80 #[arg(long = "max-io", value_name = "BYTES")]
82 pub max_io_bytes: Option<usize>,
83 #[arg(long)]
85 pub experimental_dma_buf: bool,
86 #[arg(long, value_enum, default_value_t = DmaHeapSelection::System)]
88 pub dma_heap: DmaHeapSelection,
89 #[arg(long, value_name = "PATH")]
91 pub state_file: Option<PathBuf>,
92 #[arg(long)]
94 pub adopt: bool,
95 #[arg(long, default_value_t = 0)]
97 pub metrics_port: u16,
98 #[arg(long, value_name = "PATH")]
100 pub ffs_dir: Option<PathBuf>,
101 #[arg(long)]
103 pub mimic_fastboot: bool,
104}
105
106#[derive(Clone, Copy, Debug, ValueEnum)]
107pub enum DmaHeapSelection {
108 System,
109 Cma,
110 Reserved,
111}
112
113impl From<DmaHeapSelection> for DmaHeap {
114 fn from(value: DmaHeapSelection) -> Self {
115 match value {
116 DmaHeapSelection::System => DmaHeap::System,
117 DmaHeapSelection::Cma => DmaHeap::Cma,
118 DmaHeapSelection::Reserved => DmaHeap::Reserved,
119 }
120 }
121}
122
123impl Default for Args {
124 fn default() -> Self {
125 Self {
126 vendor_id: 0xDEAD,
127 product_id: 0xBEEF,
128 queue_count: 1,
129 queue_depth: 16,
130 max_io_bytes: None,
131 experimental_dma_buf: false,
132 dma_heap: DmaHeapSelection::System,
133 state_file: None,
134 adopt: false,
135 metrics_port: 0,
136 ffs_dir: None,
137 mimic_fastboot: false,
138 }
139 }
140}
141
142pub async fn run_from_env() -> Result<()> {
143 let args = Args::parse();
144 let result = run_impl(args).await;
145 if let Err(err) = &result {
146 error!(error = ?err, "smoo-gadget exiting with error");
147 }
148 result
149}
150
151pub async fn run_with_args(args: Args) -> Result<()> {
152 run_impl(args).await
153}
154
155async fn run_impl(args: Args) -> Result<()> {
156 let metrics_shutdown = CancellationToken::new();
157 let metrics_task = spawn_metrics_listener(args.metrics_port, metrics_shutdown.clone())?;
158 let mut ublk = SmooUblk::new().context("init ublk")?;
159 let mut state_store = if let Some(path) = args.state_file.as_ref() {
160 info!(path = ?path, "state file configured");
161 match StateStore::load(path.clone()) {
162 Ok(store) => store,
163 Err(err) => {
164 warn!(path = ?path, error = ?err, "failed to load state file; starting new session");
165 StateStore::new_with_path(path.clone())
166 }
167 }
168 } else {
169 debug!("state file disabled; crash recovery off");
170 StateStore::new()
171 };
172
173 initialize_session(&mut ublk, &mut state_store).await?;
174 if args.adopt {
175 adopt_prepare(&mut ublk, &mut state_store).await?;
176 }
177
178 let (custom, endpoints, _gadget_guard, _ffs_dir) =
179 setup_configfs(&args).context("setup ConfigFS")?;
180
181 let ident = Ident::new(0, 1);
182 let dma_heap = args.experimental_dma_buf.then(|| args.dma_heap.into());
183 let max_io_bytes = args.max_io_bytes.unwrap_or(DEFAULT_MAX_IO_BYTES);
184 let gadget_config = GadgetConfig::new(
185 ident,
186 args.queue_count,
187 args.queue_depth,
188 max_io_bytes,
189 dma_heap,
190 );
191 let gadget =
192 Arc::new(SmooGadget::new(endpoints, gadget_config).context("init smoo gadget core")?);
193 info!(
194 ident_major = ident.major,
195 ident_minor = ident.minor,
196 queues = args.queue_count,
197 depth = args.queue_depth,
198 max_io_bytes = max_io_bytes,
199 "smoo gadget initialized"
200 );
201
202 let control_handler = gadget.control_handler();
203 let (control_tx, control_rx) = mpsc::channel(CONFIG_CHANNEL_DEPTH);
204 let (control_stop_tx, control_stop_rx) = watch::channel(false);
205
206 let exports = build_initial_exports(&state_store);
207 let initial_export_count = count_active_exports(&exports);
208 let status = GadgetStatusShared::new(GadgetStatus::new(
209 state_store.session_id(),
210 initial_export_count,
211 ));
212 let ep0_signals = Ep0Signals::new();
213 let control_task = tokio::spawn(control_loop(
214 custom,
215 control_handler,
216 status.clone(),
217 ep0_signals.clone(),
218 control_stop_rx,
219 control_tx,
220 ));
221 let tunables = RuntimeTunables {
222 queue_count: args.queue_count,
223 queue_depth: args.queue_depth,
224 max_io_bytes: args.max_io_bytes,
225 dma_heap,
226 };
227 let link = LinkController::new(Duration::from_secs(3));
228 let io_pump_capacity = args.queue_count as usize * args.queue_depth as usize;
229 let runtime = RuntimeState {
230 state_store,
231 status,
232 exports,
233 queue_tasks: HashMap::new(),
234 tunables,
235 gadget: Some(gadget),
236 io_pump: None,
237 io_pump_task: None,
238 io_pump_capacity,
239 reconcile_queue: VecDeque::new(),
240 data_plane_epoch: 0,
241 };
242 let result = run_event_loop(
243 &mut ublk,
244 runtime,
245 control_rx,
246 link,
247 ep0_signals,
248 control_stop_tx.clone(),
249 )
250 .await;
251 metrics_shutdown.cancel();
252 if let Some(task) = metrics_task {
253 let _ = task.await;
254 }
255 let _ = control_stop_tx.send(true);
256 control_task.abort();
257 let _ = control_task.await;
258 result
259}
260
261fn spawn_metrics_listener(
262 port: u16,
263 shutdown: CancellationToken,
264) -> Result<Option<JoinHandle<()>>> {
265 if port == 0 {
266 return Ok(None);
267 }
268 let handle = PrometheusBuilder::new()
269 .install_recorder()
270 .context("install Prometheus metrics recorder")?;
271 let addr = SocketAddr::from(([0, 0, 0, 0], port));
272 let task = tokio::spawn(async move {
273 let make_svc = make_service_fn(move |_conn| {
274 let handle = handle.clone();
275 async move {
276 Ok::<_, Infallible>(service_fn(move |req: HttpRequest<Body>| {
277 let handle = handle.clone();
278 async move {
279 if req.uri().path() != "/metrics" {
280 return Ok::<_, Infallible>(
281 HttpResponse::builder()
282 .status(StatusCode::NOT_FOUND)
283 .body(Body::from("not found"))
284 .unwrap(),
285 );
286 }
287 let body = handle.render();
288 Ok::<_, Infallible>(
289 HttpResponse::builder()
290 .status(StatusCode::OK)
291 .header(hyper::header::CONTENT_TYPE, "text/plain; version=0.0.4")
292 .body(Body::from(body))
293 .unwrap(),
294 )
295 }
296 }))
297 }
298 });
299
300 let server = Server::bind(&addr).serve(make_svc);
301 let graceful = server.with_graceful_shutdown(async {
302 shutdown.cancelled().await;
303 });
304
305 if let Err(err) = graceful.await {
306 warn!(error = %err, %addr, "metrics server error");
307 }
308 });
309
310 info!(%addr, "metrics listener started");
311 Ok(Some(task))
312}
313
314#[derive(Clone, Copy, Debug)]
315struct GadgetStatus {
316 session_id: u64,
317 export_count: u32,
318}
319
320impl GadgetStatus {
321 fn new(session_id: u64, export_count: u32) -> Self {
322 Self {
323 session_id,
324 export_count,
325 }
326 }
327}
328
329#[derive(Clone)]
330struct GadgetStatusShared {
331 inner: Arc<RwLock<GadgetStatus>>,
332}
333
334impl GadgetStatusShared {
335 fn new(initial: GadgetStatus) -> Self {
336 Self {
337 inner: Arc::new(RwLock::new(initial)),
338 }
339 }
340
341 async fn snapshot(&self) -> GadgetStatus {
342 *self.inner.read().await
343 }
344
345 async fn report(&self) -> GadgetStatusReport {
346 let snapshot = self.snapshot().await;
347 GadgetStatusReport::new(snapshot.session_id, snapshot.export_count)
348 }
349
350 async fn set_export_count(&self, export_count: u32) {
351 let mut guard = self.inner.write().await;
352 guard.export_count = export_count;
353 }
354}
355
356#[derive(Clone)]
357struct Ep0Signals {
358 status_seq: Arc<AtomicU64>,
359 lifecycle_seq: Arc<AtomicU64>,
360 lifecycle: Arc<Mutex<Vec<Event<'static>>>>,
361 notify: Arc<Notify>,
362}
363
364impl Ep0Signals {
365 fn new() -> Self {
366 Self {
367 status_seq: Arc::new(AtomicU64::new(0)),
368 lifecycle_seq: Arc::new(AtomicU64::new(0)),
369 lifecycle: Arc::new(Mutex::new(Vec::new())),
370 notify: Arc::new(Notify::new()),
371 }
372 }
373
374 fn status_seq(&self) -> u64 {
375 self.status_seq.load(Ordering::Relaxed)
376 }
377
378 fn lifecycle_seq(&self) -> u64 {
379 self.lifecycle_seq.load(Ordering::Relaxed)
380 }
381
382 fn mark_status_ping(&self) {
383 self.status_seq.fetch_add(1, Ordering::Relaxed);
384 self.notify.notify_waiters();
385 }
386
387 async fn push_lifecycle(&self, event: Event<'static>) {
388 let mut guard = self.lifecycle.lock().await;
389 guard.push(event);
390 self.lifecycle_seq.fetch_add(1, Ordering::Relaxed);
391 self.notify.notify_waiters();
392 }
393
394 async fn take_lifecycle(&self) -> Vec<Event<'static>> {
395 let mut guard = self.lifecycle.lock().await;
396 guard.drain(..).collect()
397 }
398
399 fn notifier(&self) -> Arc<Notify> {
400 self.notify.clone()
401 }
402}
403
404struct RuntimeState {
405 state_store: StateStore,
406 status: GadgetStatusShared,
407 exports: HashMap<u32, ExportController>,
408 queue_tasks: HashMap<u32, QueueTaskSet>,
409 tunables: RuntimeTunables,
410 gadget: Option<Arc<SmooGadget>>,
411 io_pump: Option<IoPumpHandle>,
412 io_pump_task: Option<JoinHandle<()>>,
413 io_pump_capacity: usize,
414 reconcile_queue: VecDeque<u32>,
415 data_plane_epoch: u64,
416}
417
418impl RuntimeState {
419 fn status(&self) -> &GadgetStatusShared {
420 &self.status
421 }
422
423 fn state_store(&mut self) -> &mut StateStore {
424 &mut self.state_store
425 }
426}
427
428type QueueSender = mpsc::Sender<QueueEvent>;
429
430struct QueueTaskSet {
431 stop: watch::Sender<bool>,
432 handles: Vec<JoinHandle<()>>,
433}
434
435impl QueueTaskSet {
436 async fn shutdown(self) {
437 let _ = self.stop.send(true);
438 for handle in self.handles {
439 let _ = handle.await;
440 }
441 }
442
443 fn abort(self) {
444 let _ = self.stop.send(true);
445 for handle in self.handles {
446 handle.abort();
447 }
448 }
449}
450
451enum QueueEvent {
452 Request {
453 export_id: u32,
454 dev_id: u32,
455 request: UblkIoRequest,
456 queues: Arc<UblkQueueRuntime>,
457 },
458 QueueError {
459 export_id: u32,
460 dev_id: u32,
461 error: anyhow::Error,
462 },
463}
464
465#[derive(Debug)]
466enum DataPlaneEvent {
467 IoError { epoch: u64, error: io::Error },
468}
469
470fn notify_data_plane_error(tx: &mpsc::UnboundedSender<DataPlaneEvent>, epoch: u64, err: io::Error) {
471 let _ = tx.send(DataPlaneEvent::IoError { epoch, error: err });
472}
473
474struct OutstandingRequest {
475 dev_id: u32,
476 request: UblkIoRequest,
477 queues: Arc<UblkQueueRuntime>,
478}
479
480fn build_initial_exports(state_store: &StateStore) -> HashMap<u32, ExportController> {
481 let mut exports = HashMap::new();
482 for record in state_store.records() {
483 if exports.contains_key(&record.export_id) {
484 warn!(
485 export_id = record.export_id,
486 "duplicate export_id in state store; skipping"
487 );
488 continue;
489 }
490 let state = match record.assigned_dev_id {
491 Some(dev_id) => ExportState::RecoveringPending { dev_id },
492 None => ExportState::New,
493 };
494 exports.insert(
495 record.export_id,
496 ExportController::new(record.export_id, record.spec.clone(), state),
497 );
498 }
499 exports
500}
501
502fn spawn_queue_tasks(
503 export_id: u32,
504 dev_id: u32,
505 queues: Arc<UblkQueueRuntime>,
506 tx: QueueSender,
507) -> QueueTaskSet {
508 let (stop, stop_rx) = watch::channel(false);
509 let mut handles = Vec::new();
510 for queue_id in 0..queues.queue_count() {
511 let mut stop_rx = stop_rx.clone();
512 let queues = queues.clone();
513 let tx = tx.clone();
514 handles.push(tokio::spawn(async move {
515 queue_task_loop(export_id, dev_id, queue_id, queues, &mut stop_rx, tx).await;
516 }));
517 }
518 QueueTaskSet { stop, handles }
519}
520
521async fn queue_task_loop(
522 export_id: u32,
523 dev_id: u32,
524 queue_id: u16,
525 queues: Arc<UblkQueueRuntime>,
526 stop: &mut watch::Receiver<bool>,
527 tx: QueueSender,
528) {
529 loop {
530 tokio::select! {
531 _changed = stop.changed() => {
532 break;
533 }
534 req = queues.next_io(queue_id) => {
535 match req {
536 Ok(request) => {
537 let send_fut = tx.send(QueueEvent::Request { export_id, dev_id, request, queues: queues.clone() });
538 tokio::select! {
539 res = send_fut => {
540 if res.is_err() {
541 break;
542 }
543 }
544 _ = stop.changed() => break,
545 }
546 }
547 Err(err) => {
548 if !*stop.borrow() {
549 let send_fut = tx.send(QueueEvent::QueueError { export_id, dev_id, error: err });
550 let _ = tokio::select! {
551 res = send_fut => res,
552 _ = stop.changed() => Ok(()),
553 };
554 }
555 break;
556 }
557 }
558 }
559 }
560 }
561}
562
563async fn sync_queue_tasks(runtime: &mut RuntimeState, queue_tx: &QueueSender) {
564 if runtime.io_pump.is_none() {
565 stop_all_queue_tasks(runtime).await;
566 return;
567 }
568 let mut to_stop: Vec<u32> = runtime
569 .queue_tasks
570 .keys()
571 .cloned()
572 .filter(|export_id| !runtime.exports.contains_key(export_id))
573 .collect();
574
575 for (&export_id, controller) in runtime.exports.iter() {
576 let should_run = controller
577 .device_handle()
578 .map(|h| {
579 matches!(
580 h,
581 DeviceHandle::Online { .. } | DeviceHandle::Starting { .. }
582 )
583 })
584 .unwrap_or(false);
585 let running = runtime.queue_tasks.contains_key(&export_id);
586 if should_run && runtime.io_pump.is_some() && !running {
587 if let Some(handle) = controller.device_handle() {
588 if let Some(queues) = handle.queues() {
589 let tasks =
590 spawn_queue_tasks(export_id, handle.dev_id(), queues, queue_tx.clone());
591 runtime.queue_tasks.insert(export_id, tasks);
592 }
593 }
594 } else if !should_run && running {
595 to_stop.push(export_id);
596 }
597 }
598
599 for export_id in to_stop {
600 if let Some(tasks) = runtime.queue_tasks.remove(&export_id) {
601 tasks.shutdown().await;
602 }
603 }
604}
605
606async fn stop_all_queue_tasks(runtime: &mut RuntimeState) {
607 let mut tasks = std::mem::take(&mut runtime.queue_tasks);
608 for (_, taskset) in tasks.drain() {
609 taskset.shutdown().await;
610 }
611}
612
613async fn ensure_data_plane(runtime: &mut RuntimeState) {
614 if runtime.gadget.is_none() {
615 if let Some(pump) = runtime.io_pump.take() {
616 drop(pump);
617 }
618 if let Some(task) = runtime.io_pump_task.take() {
619 task.abort();
620 let _ = task.await;
621 }
622 return;
623 }
624
625 if runtime.io_pump.is_none() {
626 if let Some(gadget) = runtime.gadget.clone() {
627 let (handle, task) = IoPumpHandle::spawn(gadget, runtime.io_pump_capacity);
628 runtime.io_pump = Some(handle);
629 runtime.io_pump_task = Some(task);
630 }
631 }
632}
633
634async fn drain_ep0_signals(
635 ep0_signals: &Ep0Signals,
636 last_status_seq: &mut u64,
637 last_lifecycle_seq: &mut u64,
638 link: &mut LinkController,
639) {
640 let status_seq = ep0_signals.status_seq();
641 if status_seq != *last_status_seq {
642 *last_status_seq = status_seq;
643 link.on_status_ping();
644 }
645 if ep0_signals.lifecycle_seq() != *last_lifecycle_seq {
646 let events = ep0_signals.take_lifecycle().await;
647 *last_lifecycle_seq = ep0_signals.lifecycle_seq();
648 for event in events {
649 link.on_ep0_event(event);
650 }
651 }
652}
653
654async fn drain_queue_batch(
655 runtime: &mut RuntimeState,
656 link: &mut LinkController,
657 outstanding: &mut HashMap<u32, HashMap<(u16, u16), OutstandingRequest>>,
658 data_plane_tx: &mpsc::UnboundedSender<DataPlaneEvent>,
659 queue_rx: &mut mpsc::Receiver<QueueEvent>,
660) -> Result<()> {
661 let mut processed = 0;
662 while processed < QUEUE_BATCH_MAX.saturating_sub(1) {
663 match queue_rx.try_recv() {
664 Ok(evt) => {
665 handle_queue_event(runtime, link, outstanding, data_plane_tx, evt).await?;
666 processed += 1;
667 }
668 Err(TryRecvError::Empty) | Err(TryRecvError::Disconnected) => break,
669 }
670 }
671 if processed >= QUEUE_BATCH_MAX.saturating_sub(1) {
672 trace!(processed, "queue batch truncated; will continue next tick");
673 }
674 Ok(())
675}
676
677fn pop_next_outstanding(
678 outstanding: &mut HashMap<u32, HashMap<(u16, u16), OutstandingRequest>>,
679) -> Option<(u32, u16, u16, OutstandingRequest)> {
680 let (export_id, (queue_id, tag)) = outstanding.iter().find_map(|(export_id, reqs)| {
681 reqs.keys()
682 .next()
683 .map(|(queue, tag)| (*export_id, (*queue, *tag)))
684 })?;
685 let pending = outstanding
686 .get_mut(&export_id)
687 .and_then(|map| map.remove(&(queue_id, tag)))?;
688 if let Some(map) = outstanding.get(&export_id) {
689 if map.is_empty() {
690 outstanding.remove(&export_id);
691 }
692 }
693 Some((export_id, queue_id, tag, pending))
694}
695
696async fn drain_outstanding_bounded(
697 runtime: &mut RuntimeState,
698 link: &mut LinkController,
699 outstanding: &mut HashMap<u32, HashMap<(u16, u16), OutstandingRequest>>,
700 deadline: Instant,
701) -> Result<()> {
702 if outstanding.is_empty() {
703 return Ok(());
704 }
705 if link.state() != LinkState::Online {
706 trace!(
707 outstanding_exports = outstanding.len(),
708 "link not online; deferring outstanding IO drain"
709 );
710 return Ok(());
711 }
712 let Some(pump) = runtime.io_pump.as_ref() else {
713 trace!(
714 outstanding_exports = outstanding.len(),
715 "no gadget endpoints available; deferring outstanding IO drain"
716 );
717 return Ok(());
718 };
719
720 let mut processed = 0usize;
721 while processed < OUTSTANDING_BATCH_MAX && Instant::now() < deadline {
722 let Some((export_id, _queue_id, _tag, pending)) = pop_next_outstanding(outstanding) else {
723 break;
724 };
725 let Some(ctrl) = runtime.exports.get(&export_id) else {
726 let _ = pending.queues.complete_io(pending.request, -libc::ENODEV);
727 continue;
728 };
729 let Some(handle) = ctrl.device_handle() else {
730 park_request(
731 outstanding,
732 export_id,
733 pending.dev_id,
734 pending.queues.clone(),
735 pending.request,
736 );
737 break;
738 };
739 if handle.dev_id() != pending.dev_id {
740 trace!(
741 export_id,
742 stale_dev = pending.dev_id,
743 current_dev = handle.dev_id(),
744 "dropping outstanding for stale device"
745 );
746 continue;
747 }
748 if !handle.is_online() {
749 park_request(
750 outstanding,
751 export_id,
752 pending.dev_id,
753 pending.queues.clone(),
754 pending.request,
755 );
756 break;
757 }
758 let Some(queues) = handle.queues() else {
759 park_request(
760 outstanding,
761 export_id,
762 pending.dev_id,
763 pending.queues.clone(),
764 pending.request,
765 );
766 break;
767 };
768 let req = pending.request;
769 trace!(
770 export_id,
771 dev_id = pending.dev_id,
772 queue = req.queue_id,
773 tag = req.tag,
774 "replaying outstanding IO to host"
775 );
776 if let Err(err) = handle_request(pump.clone(), export_id, queues.clone(), req).await {
777 let io_err = io_error_from_anyhow(&err);
778 link.on_io_error(&io_err);
779 park_request(
780 outstanding,
781 export_id,
782 pending.dev_id,
783 pending.queues.clone(),
784 req,
785 );
786 warn!(
787 export_id,
788 queue = req.queue_id,
789 tag = req.tag,
790 error = ?err,
791 "link error replaying outstanding IO; parked again"
792 );
793 break;
794 }
795 processed += 1;
796 }
797
798 if !outstanding.is_empty() {
799 trace!(
800 remaining_exports = outstanding.len(),
801 processed,
802 "outstanding drain truncated"
803 );
804 }
805 Ok(())
806}
807
808async fn run_reconcile_slice(
809 ublk: &mut SmooUblk,
810 runtime: &mut RuntimeState,
811 deadline: Instant,
812) -> Result<()> {
813 let now = Instant::now();
814 for (&export_id, ctrl) in runtime.exports.iter() {
815 if ctrl.needs_reconcile(now) && !runtime.reconcile_queue.contains(&export_id) {
816 runtime.reconcile_queue.push_back(export_id);
817 }
818 }
819
820 while Instant::now() < deadline {
821 let Some(export_id) = runtime.reconcile_queue.pop_front() else {
822 break;
823 };
824 let now = Instant::now();
825 let needs_reconcile = runtime
826 .exports
827 .get(&export_id)
828 .is_some_and(|ctrl| ctrl.needs_reconcile(now));
829 if !needs_reconcile {
830 continue;
831 }
832
833 let tunables = runtime.tunables;
834 let mut controller = match runtime.exports.remove(&export_id) {
835 Some(ctrl) => ctrl,
836 None => continue,
837 };
838 {
839 let mut cx = ExportReconcileContext {
840 ublk,
841 state_store: runtime.state_store(),
842 tunables,
843 };
844 match tokio::time::timeout(
845 Duration::from_millis(RECONCILE_TIMEOUT_MS),
846 controller.reconcile(&mut cx),
847 )
848 .await
849 {
850 Ok(Ok(())) => {}
851 Ok(Err(err)) => {
852 warn!(export_id, error = ?err, "reconcile failed; backing off");
853 controller.fail_device(format!("reconcile failed: {err:#}"));
854 }
855 Err(_) => {
856 warn!(export_id, "reconcile timed out; backing off");
857 controller.fail_device("reconcile timed out".to_string());
858 }
859 }
860 }
861 let needs_more = controller.needs_reconcile(Instant::now());
862 runtime.exports.insert(export_id, controller);
863 if needs_more {
864 runtime.reconcile_queue.push_back(export_id);
865 }
866
867 if Instant::now() >= deadline {
868 break;
869 }
870 }
871 Ok(())
872}
873
874#[allow(clippy::too_many_arguments)]
875async fn drive_runtime(
876 ublk: &mut SmooUblk,
877 runtime: &mut RuntimeState,
878 link: &mut LinkController,
879 outstanding: &mut HashMap<u32, HashMap<(u16, u16), OutstandingRequest>>,
880 queue_tx: Option<&QueueSender>,
881 allow_reconcile: bool,
882) -> Result<()> {
883 let deadline = Instant::now() + Duration::from_millis(MAINTENANCE_SLICE_MS);
884 link.tick(Instant::now());
885 process_link_commands(runtime, link).await?;
886 ensure_data_plane(runtime).await;
887 if let Some(tx) = queue_tx {
888 sync_queue_tasks(runtime, tx).await;
889 }
890 drain_outstanding_bounded(runtime, link, outstanding, deadline).await?;
891 if allow_reconcile {
892 run_reconcile_slice(ublk, runtime, deadline).await?;
893 }
894 let active_count = count_active_exports(&runtime.exports);
895 runtime.status().set_export_count(active_count).await;
896 Ok(())
897}
898
899async fn handle_config_message(
900 ublk: &mut SmooUblk,
901 runtime: &mut RuntimeState,
902 link: &mut LinkController,
903 outstanding: &mut HashMap<u32, HashMap<(u16, u16), OutstandingRequest>>,
904 config: ConfigExportsV0,
905) -> Result<()> {
906 apply_config(ublk, runtime, config).await?;
907 prune_outstanding_for_missing_exports(outstanding, &runtime.exports);
908 process_link_commands(runtime, link).await?;
909 Ok(())
910}
911
912async fn stop_accepting_new_io(runtime: &mut RuntimeState, queue_tx: &mut Option<QueueSender>) {
913 stop_all_queue_tasks(runtime).await;
914 *queue_tx = None;
915}
916
917enum ShutdownState {
918 Running,
919 Graceful { deadline: Instant },
920 Forceful,
921}
922
923async fn run_event_loop(
924 ublk: &mut SmooUblk,
925 mut runtime: RuntimeState,
926 mut control_rx: mpsc::Receiver<ConfigExportsV0>,
927 mut link: LinkController,
928 ep0_signals: Ep0Signals,
929 control_stop: watch::Sender<bool>,
930) -> Result<()> {
931 let mut shutdown = Some(Box::pin(signal::ctrl_c()));
932 let mut hup = unix_signal(SignalKind::hangup()).context("install SIGHUP handler")?;
933 let idle_sleep = tokio::time::sleep(Duration::from_millis(IDLE_INTERVAL_MS));
934 tokio::pin!(idle_sleep);
935 let mut liveness_tick = tokio::time::interval(Duration::from_millis(LIVENESS_INTERVAL_MS));
936 liveness_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
937
938 let mut outstanding: HashMap<u32, HashMap<(u16, u16), OutstandingRequest>> = HashMap::new();
939 let (queue_tx_init, mut queue_rx) = mpsc::channel::<QueueEvent>(QUEUE_CHANNEL_DEPTH);
940 let mut queue_tx: Option<QueueSender> = Some(queue_tx_init);
941 let (data_plane_tx, mut data_plane_rx) = mpsc::unbounded_channel::<DataPlaneEvent>();
942 let ep0_notify = ep0_signals.notifier();
943
944 let mut io_error = None;
945 let mut exit_reason: Option<String> = None;
946 let mut recovery_exit = false;
947 let mut shutdown_state = ShutdownState::Running;
948 let mut last_status_seq = ep0_signals.status_seq();
949 let mut last_lifecycle_seq = ep0_signals.lifecycle_seq();
950
951 loop {
952 idle_sleep
953 .as_mut()
954 .reset(tokio::time::Instant::now() + Duration::from_millis(IDLE_INTERVAL_MS));
955
956 drain_ep0_signals(
957 &ep0_signals,
958 &mut last_status_seq,
959 &mut last_lifecycle_seq,
960 &mut link,
961 )
962 .await;
963 process_link_commands(&mut runtime, &mut link).await?;
964 ensure_data_plane(&mut runtime).await;
965
966 if runtime
967 .io_pump_task
968 .as_ref()
969 .is_some_and(|task| task.is_finished())
970 {
971 if let Some(task) = runtime.io_pump_task.take() {
972 let _ = task.await;
973 }
974 warn!(
975 epoch = runtime.data_plane_epoch,
976 "io pump task exited unexpectedly; notifying data plane"
977 );
978 notify_data_plane_error(
979 &data_plane_tx,
980 runtime.data_plane_epoch,
981 io::Error::other("io pump exited"),
982 );
983 }
984
985 if let ShutdownState::Graceful { deadline } = shutdown_state {
986 if Instant::now() >= deadline {
987 warn!("graceful shutdown timed out; forcing shutdown");
988 shutdown_state = ShutdownState::Forceful;
989 }
990 }
991
992 if matches!(shutdown_state, ShutdownState::Forceful) {
993 note_exit_reason(&mut exit_reason, "forceful shutdown state entered");
994 break;
995 }
996
997 let ep0_notified = ep0_notify.notified();
998 tokio::pin!(ep0_notified);
999 tokio::select! { biased;
1000 _ = async {
1001 if let Some(fut) = shutdown.as_mut() {
1002 let _ = fut.as_mut().await;
1003 }
1004 }, if shutdown.is_some() => {
1005 shutdown = None;
1006 match shutdown_state {
1007 ShutdownState::Running => {
1008 info!("shutdown signal received; entering graceful shutdown");
1009 shutdown_state = ShutdownState::Graceful {
1010 deadline: Instant::now() + Duration::from_millis(GRACEFUL_SHUTDOWN_TIMEOUT_MS),
1011 };
1012 stop_accepting_new_io(&mut runtime, &mut queue_tx).await;
1013 let _ = control_stop.send(true);
1014 }
1015 ShutdownState::Graceful { .. } => {
1016 warn!("second shutdown signal; forcing shutdown");
1017 note_exit_reason(&mut exit_reason, "second shutdown signal received");
1018 shutdown_state = ShutdownState::Forceful;
1019 break;
1020 }
1021 ShutdownState::Forceful => break,
1022 }
1023 }
1024 Some(_) = hup.recv() => {
1025 info!("SIGHUP received; initiating user recovery");
1026 note_exit_reason(&mut exit_reason, "SIGHUP received; entering user recovery");
1027 let _ = control_stop.send(true);
1028 begin_user_recovery(ublk, &mut runtime).await?;
1029 recovery_exit = true;
1030 break;
1031 }
1032 event = data_plane_rx.recv() => {
1033 if let Some(event) = event {
1034 if let Err(err) = handle_data_plane_event(
1035 &mut runtime,
1036 &mut link,
1037 event,
1038 )
1039 .await
1040 {
1041 warn!(error = ?err, "run_event_loop: data plane event handling failed");
1042 note_exit_reason(
1043 &mut exit_reason,
1044 format!("data plane event handling failed: {err:#}"),
1045 );
1046 io_error = Some(err);
1047 break;
1048 }
1049 }
1050 }
1051 Some(config) = control_rx.recv(), if matches!(shutdown_state, ShutdownState::Running) => {
1052 if let Err(err) = handle_config_message(
1053 ublk,
1054 &mut runtime,
1055 &mut link,
1056 &mut outstanding,
1057 config,
1058 )
1059 .await
1060 {
1061 warn!(error = ?err, "CONFIG_EXPORTS application failed");
1062 }
1063 }
1064 _ = ep0_notified.as_mut() => {
1065 continue;
1066 }
1067 maybe_evt = queue_rx.recv(), if !matches!(shutdown_state, ShutdownState::Forceful) && runtime.io_pump.is_some() => {
1068 if let Some(evt) = maybe_evt {
1069 if let Err(err) = handle_queue_event(
1070 &mut runtime,
1071 &mut link,
1072 &mut outstanding,
1073 &data_plane_tx,
1074 evt,
1075 )
1076 .await
1077 {
1078 warn!(error = ?err, "run_event_loop: queue event handling failed");
1079 note_exit_reason(
1080 &mut exit_reason,
1081 format!("queue event handling failed: {err:#}"),
1082 );
1083 io_error = Some(err);
1084 break;
1085 }
1086 if let Err(err) = drain_queue_batch(
1087 &mut runtime,
1088 &mut link,
1089 &mut outstanding,
1090 &data_plane_tx,
1091 &mut queue_rx,
1092 )
1093 .await
1094 {
1095 warn!(error = ?err, "run_event_loop: queue batch drain failed");
1096 note_exit_reason(
1097 &mut exit_reason,
1098 format!("queue batch drain failed: {err:#}"),
1099 );
1100 io_error = Some(err);
1101 break;
1102 }
1103 if let Err(err) = drive_runtime(
1104 ublk,
1105 &mut runtime,
1106 &mut link,
1107 &mut outstanding,
1108 queue_tx.as_ref(),
1109 false,
1110 ).await {
1111 warn!(error = ?err, "run_event_loop: drive_runtime failed after queue events");
1112 note_exit_reason(
1113 &mut exit_reason,
1114 format!("drive_runtime after queue events failed: {err:#}"),
1115 );
1116 io_error = Some(err);
1117 break;
1118 }
1119 }
1120 }
1121 _ = liveness_tick.tick() => {
1122 if let Err(err) = drive_runtime(
1123 ublk,
1124 &mut runtime,
1125 &mut link,
1126 &mut outstanding,
1127 queue_tx.as_ref(),
1128 false,
1129 ).await {
1130 warn!(error = ?err, "run_event_loop: drive_runtime failed on liveness tick");
1131 note_exit_reason(
1132 &mut exit_reason,
1133 format!("drive_runtime on liveness tick failed: {err:#}"),
1134 );
1135 io_error = Some(err);
1136 break;
1137 }
1138 }
1139 _ = &mut idle_sleep => {
1140 let allow_reconcile = matches!(shutdown_state, ShutdownState::Running);
1141 if let Err(err) = drive_runtime(
1142 ublk,
1143 &mut runtime,
1144 &mut link,
1145 &mut outstanding,
1146 queue_tx.as_ref(),
1147 allow_reconcile,
1148 ).await {
1149 warn!(error = ?err, "run_event_loop: drive_runtime failed on idle maintenance");
1150 note_exit_reason(
1151 &mut exit_reason,
1152 format!("drive_runtime on idle maintenance failed: {err:#}"),
1153 );
1154 io_error = Some(err);
1155 break;
1156 }
1157 }
1158 }
1159
1160 if let ShutdownState::Graceful { deadline } = shutdown_state {
1161 if let Err(err) = drive_runtime(
1162 ublk,
1163 &mut runtime,
1164 &mut link,
1165 &mut outstanding,
1166 queue_tx.as_ref(),
1167 false,
1168 )
1169 .await
1170 {
1171 warn!(error = ?err, "run_event_loop: drive_runtime failed during graceful shutdown");
1172 note_exit_reason(
1173 &mut exit_reason,
1174 format!("drive_runtime during graceful shutdown failed: {err:#}"),
1175 );
1176 io_error = Some(err);
1177 break;
1178 }
1179 let outstanding_empty = outstanding.is_empty();
1180 let queue_drained = queue_rx.is_closed() && queue_rx.is_empty();
1181 if outstanding_empty && queue_drained {
1182 note_exit_reason(&mut exit_reason, "graceful shutdown complete");
1183 info!("graceful shutdown complete; exiting");
1184 break;
1185 }
1186 if Instant::now() >= deadline {
1187 warn!("graceful shutdown deadline reached; forcing shutdown");
1188 note_exit_reason(&mut exit_reason, "graceful shutdown deadline reached");
1189 shutdown_state = ShutdownState::Forceful;
1190 shutdown = None;
1191 }
1192 }
1193 }
1194
1195 if let Some(reason) = exit_reason.as_deref() {
1196 warn!(%reason, recovery_exit, "event loop exiting");
1197 } else {
1198 info!(recovery_exit, "event loop exiting");
1199 }
1200
1201 if let Some(pump) = runtime.io_pump.take() {
1202 drop(pump);
1203 }
1204 if let Some(task) = runtime.io_pump_task.take() {
1205 task.abort();
1206 let _ = task.await;
1207 }
1208
1209 if recovery_exit {
1210 return Ok(());
1211 }
1212
1213 let _ = control_stop.send(true);
1214 let forceful = matches!(shutdown_state, ShutdownState::Forceful);
1215 info!(
1216 shutdown_reason = exit_reason.as_deref().unwrap_or("unspecified"),
1217 forceful, "cleaning up ublk devices"
1218 );
1219 cleanup_ublk_devices(ublk, &mut runtime, exit_reason.as_deref(), forceful).await?;
1220 runtime.status().set_export_count(0).await;
1221
1222 if let Err(err) = runtime.state_store().remove_file() {
1223 warn!(error = ?err, "failed to remove state file on shutdown");
1224 } else {
1225 debug!("state file removed on shutdown");
1226 }
1227
1228 if let Some(err) = io_error {
1229 Err(err)
1230 } else {
1231 Ok(())
1232 }
1233}
1234
1235async fn handle_request(
1236 pump: IoPumpHandle,
1237 export_id: u32,
1238 queues: Arc<UblkQueueRuntime>,
1239 req: UblkIoRequest,
1240) -> Result<()> {
1241 let block_size = queues.block_size();
1242 let req_len = match request_byte_len(&req, block_size) {
1243 Ok(len) => len,
1244 Err(err) => {
1245 let errno = errno_from_io(&err);
1246 warn!(
1247 queue = req.queue_id,
1248 tag = req.tag,
1249 errno = errno,
1250 ?req.op,
1251 "invalid request length: {err}"
1252 );
1253 queues
1254 .complete_io(req, -errno)
1255 .context("complete invalid request")?;
1256 return Ok(());
1257 }
1258 };
1259
1260 let opcode = match opcode_from_ublk(req.op) {
1261 Some(op) => op,
1262 None => {
1263 warn!(
1264 queue = req.queue_id,
1265 tag = req.tag,
1266 op = ?req.op,
1267 "unsupported ublk opcode"
1268 );
1269 queues
1270 .complete_io(req, -libc::EOPNOTSUPP)
1271 .context("complete unsupported opcode")?;
1272 return Ok(());
1273 }
1274 };
1275
1276 trace!(
1277 export_id,
1278 dev_id = queues.dev_id(),
1279 queue = req.queue_id,
1280 tag = req.tag,
1281 op = ?req.op,
1282 req_bytes = req_len,
1283 block_size,
1284 "handle_request begin"
1285 );
1286
1287 if matches!(opcode, OpCode::Read | OpCode::Write) && req_len > 0 {
1288 let capacity = queues.buffer_len();
1289 if req_len > capacity {
1290 warn!(
1291 queue = req.queue_id,
1292 tag = req.tag,
1293 req_bytes = req_len,
1294 buf_cap = capacity,
1295 "request exceeds buffer capacity"
1296 );
1297 queues
1298 .complete_io(req, -libc::EINVAL)
1299 .context("complete oversized request")?;
1300 return Ok(());
1301 }
1302 }
1303
1304 let num_blocks = u32::try_from(req_len / block_size)
1305 .context("request block count exceeds protocol limit")?;
1306 let request_id = make_request_id(req.queue_id, req.tag);
1307 let proto_req = Request::new(export_id, request_id, opcode, req.sector, num_blocks, 0);
1308 trace!(
1309 export_id,
1310 dev_id = queues.dev_id(),
1311 queue = req.queue_id,
1312 tag = req.tag,
1313 op = ?opcode,
1314 num_blocks,
1315 req_bytes = req_len,
1316 "dispatching smoo Request through pump"
1317 );
1318 let work = IoWork {
1319 ublk_request: req,
1320 request: proto_req,
1321 req_len,
1322 block_size,
1323 queue_id: req.queue_id,
1324 tag: req.tag,
1325 op: opcode,
1326 queues: queues.clone(),
1327 };
1328 pump.submit(work).await
1329}
1330
1331async fn control_loop(
1332 mut custom: Custom,
1333 handler: GadgetControl,
1334 status: GadgetStatusShared,
1335 signals: Ep0Signals,
1336 mut stop: watch::Receiver<bool>,
1337 tx: mpsc::Sender<ConfigExportsV0>,
1338) -> Result<()> {
1339 loop {
1340 tokio::select! {
1341 _ = stop.changed() => {
1342 debug!("control loop stopping on shutdown signal");
1343 return Ok(());
1344 }
1345 result = custom.wait_event() => {
1346 result.context("wait for FunctionFS event")?;
1347 }
1348 }
1349 let event = custom.event().context("read FunctionFS event")?;
1350 match event {
1351 usb_gadget::function::custom::Event::Bind => {
1352 debug!("FunctionFS bind event (control loop)");
1353 signals.push_lifecycle(Event::Bind).await;
1354 }
1355 usb_gadget::function::custom::Event::Unbind => {
1356 debug!("FunctionFS unbind event (control loop)");
1357 signals.push_lifecycle(Event::Unbind).await;
1358 }
1359 usb_gadget::function::custom::Event::Enable => {
1360 debug!("FunctionFS enable event (control loop)");
1361 signals.push_lifecycle(Event::Enable).await;
1362 }
1363 usb_gadget::function::custom::Event::Disable => {
1364 debug!("FunctionFS disable event (control loop)");
1365 signals.push_lifecycle(Event::Disable).await;
1366 }
1367 usb_gadget::function::custom::Event::Suspend => {
1368 debug!("FunctionFS suspend event (control loop)");
1369 signals.push_lifecycle(Event::Suspend).await;
1370 }
1371 usb_gadget::function::custom::Event::Resume => {
1372 debug!("FunctionFS resume event (control loop)");
1373 signals.push_lifecycle(Event::Resume).await;
1374 }
1375 usb_gadget::function::custom::Event::SetupDeviceToHost(sender) => {
1376 let report = status.report().await;
1377 let setup = setup_from_ctrl_req(sender.ctrl_req());
1378 let mut io = UsbControlIo::from_sender(sender);
1379 if let Err(err) = handler.handle_setup_packet(&mut io, setup, &report).await {
1380 warn!(error = ?err, "vendor setup handling failed");
1381 let _ = io.stall().await;
1382 } else if is_status_setup(&setup) {
1383 signals.mark_status_ping();
1384 }
1385 }
1386 usb_gadget::function::custom::Event::SetupHostToDevice(receiver) => {
1387 let report = status.report().await;
1388 let setup = setup_from_ctrl_req(receiver.ctrl_req());
1389 let mut io = UsbControlIo::from_receiver(receiver);
1390 match handler.handle_setup_packet(&mut io, setup, &report).await {
1391 Ok(Some(SetupCommand::Config(payload))) => match tx.try_send(payload) {
1392 Ok(()) => {}
1393 Err(TrySendError::Closed(_)) => {
1394 warn!("CONFIG_EXPORTS channel closed; dropping payload");
1395 }
1396 Err(TrySendError::Full(_)) => {
1397 warn!("CONFIG_EXPORTS channel full; dropping payload");
1398 }
1399 },
1400 Ok(None) => {
1401 if is_status_setup(&setup) {
1402 signals.mark_status_ping();
1403 }
1404 }
1405 Err(err) => {
1406 warn!(error = ?err, "vendor setup handling failed");
1407 let _ = io.stall().await;
1408 }
1409 }
1410 }
1411 usb_gadget::function::custom::Event::Unknown(code) => {
1412 debug!(event = code, "FunctionFS unknown event");
1413 }
1414 _ => {}
1415 }
1416 }
1417}
1418
1419fn opcode_from_ublk(op: UblkOp) -> Option<OpCode> {
1420 match op {
1421 UblkOp::Read => Some(OpCode::Read),
1422 UblkOp::Write => Some(OpCode::Write),
1423 UblkOp::Flush => Some(OpCode::Flush),
1424 UblkOp::Discard => Some(OpCode::Discard),
1425 UblkOp::Unknown(_) => None,
1426 }
1427}
1428
1429fn make_request_id(queue_id: u16, tag: u16) -> u32 {
1430 ((queue_id as u32) << 16) | tag as u32
1431}
1432
1433fn request_byte_len(req: &UblkIoRequest, block_size: usize) -> io::Result<usize> {
1434 let sectors = usize::try_from(req.num_sectors)
1435 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "sector count overflow"))?;
1436 sectors
1437 .checked_mul(block_size)
1438 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "request byte length overflow"))
1439}
1440
1441fn errno_from_io(err: &io::Error) -> i32 {
1442 err.raw_os_error().unwrap_or_else(|| match err.kind() {
1443 io::ErrorKind::Unsupported => libc::EOPNOTSUPP,
1444 io::ErrorKind::PermissionDenied => libc::EACCES,
1445 io::ErrorKind::UnexpectedEof => libc::EIO,
1446 io::ErrorKind::NotFound => libc::ENOENT,
1447 io::ErrorKind::InvalidInput => libc::EINVAL,
1448 _ => libc::EIO,
1449 })
1450}
1451
1452fn setup_from_ctrl_req(ctrl: &CtrlReq) -> SetupPacket {
1453 SetupPacket::from_fields(
1454 ctrl.request_type,
1455 ctrl.request,
1456 ctrl.value,
1457 ctrl.index,
1458 ctrl.length,
1459 )
1460}
1461
1462fn is_status_setup(setup: &SetupPacket) -> bool {
1463 setup.request() == SMOO_STATUS_REQUEST && setup.request_type() == SMOO_STATUS_REQ_TYPE
1464}
1465
1466enum UsbControlInner<'a> {
1467 In(Option<CtrlSender<'a>>),
1468 Out(Option<CtrlReceiver<'a>>),
1469}
1470
1471struct UsbControlIo<'a> {
1472 inner: UsbControlInner<'a>,
1473}
1474
1475impl<'a> UsbControlIo<'a> {
1476 fn from_sender(sender: CtrlSender<'a>) -> Self {
1477 Self {
1478 inner: UsbControlInner::In(Some(sender)),
1479 }
1480 }
1481
1482 fn from_receiver(receiver: CtrlReceiver<'a>) -> Self {
1483 Self {
1484 inner: UsbControlInner::Out(Some(receiver)),
1485 }
1486 }
1487}
1488
1489#[async_trait::async_trait]
1490impl ControlIo for UsbControlIo<'_> {
1491 async fn write_in(&mut self, data: &[u8]) -> Result<()> {
1492 match &mut self.inner {
1493 UsbControlInner::In(sender) => {
1494 let sender = sender.take().context("control sender already used")?;
1495 sender
1496 .send(data)
1497 .with_context(|| format!("send control response of {} bytes", data.len()))
1498 .map(|_| ())
1499 }
1500 UsbControlInner::Out(_) => Ok(()),
1501 }
1502 }
1503
1504 async fn read_out(&mut self, buf: &mut [u8]) -> Result<()> {
1505 match &mut self.inner {
1506 UsbControlInner::Out(receiver) => {
1507 let receiver = receiver.take().context("control receiver already used")?;
1508 let read = receiver
1509 .recv(buf)
1510 .with_context(|| format!("read control payload of {} bytes", buf.len()))?;
1511 ensure!(read == buf.len(), "control payload truncated");
1512 Ok(())
1513 }
1514 UsbControlInner::In(_) => Err(anyhow!("attempted to read_out on IN control transfer")),
1515 }
1516 }
1517
1518 async fn stall(&mut self) -> Result<()> {
1519 match &mut self.inner {
1520 UsbControlInner::In(sender) => {
1521 let sender = sender.take().context("control sender already used")?;
1522 sender.halt().context("stall control sender")
1523 }
1524 UsbControlInner::Out(receiver) => {
1525 let receiver = receiver.take().context("control receiver already used")?;
1526 receiver.halt().context("stall control receiver")
1527 }
1528 }
1529 }
1530}
1531async fn initialize_session(_ublk: &mut SmooUblk, state_store: &mut StateStore) -> Result<()> {
1532 if state_store.records().is_empty() {
1533 if state_store.path().is_some() {
1534 debug!("state file present but no exports recorded; nothing to recover");
1535 }
1536 return Ok(());
1537 }
1538
1539 let mut seen = HashSet::new();
1540 let mut reset = false;
1541 for record in state_store.records() {
1542 if !seen.insert(record.export_id) {
1543 warn!(
1544 export_id = record.export_id,
1545 "state file contains duplicate export_id; clearing state"
1546 );
1547 reset = true;
1548 break;
1549 }
1550 if let Err(err) = validate_persisted_record(record) {
1551 warn!(
1552 export_id = record.export_id,
1553 error = ?err,
1554 "state file entry invalid; clearing state"
1555 );
1556 reset = true;
1557 break;
1558 }
1559 }
1560
1561 if reset {
1562 reset_state_store(state_store);
1563 let _ = state_store.persist();
1564 }
1565 Ok(())
1566}
1567
1568async fn adopt_prepare(ublk: &mut SmooUblk, state_store: &mut StateStore) -> Result<()> {
1569 let mut dev_ids = Vec::new();
1570 let mut owner_pids = HashSet::new();
1571 let mut stale_devices = false;
1572 for record in state_store.records() {
1573 if let Some(dev_id) = record.assigned_dev_id {
1574 dev_ids.push(dev_id);
1575 match ublk.owner_pid(dev_id).await {
1576 Ok(pid) => {
1577 let alive = pid_is_alive(pid);
1578 debug!(dev_id, pid, alive, "queried ublk owner");
1579 if pid > 0 && pid != unsafe { libc::getpid() } && alive {
1580 owner_pids.insert(pid);
1581 } else if pid > 0 && !alive {
1582 stale_devices = true;
1583 }
1584 }
1585 Err(err) => {
1586 let missing = error_is_missing(&err);
1587 warn!(dev_id, error = ?err, missing, "query owner pid failed");
1588 if missing {
1589 stale_devices = true;
1590 }
1591 }
1592 }
1593 }
1594 }
1595
1596 if stale_devices && owner_pids.is_empty() {
1597 warn!("no surviving owners and stale devices detected; resetting state for fresh session");
1598 reset_state_store(state_store);
1599 if let Err(err) = state_store.persist() {
1600 warn!(error = ?err, "persist state reset failed");
1601 }
1602 return Ok(());
1603 }
1604
1605 if owner_pids.len() > 1 {
1606 warn!(
1607 owners = ?owner_pids,
1608 "multiple ublk owners detected; resetting state for clean session"
1609 );
1610 reset_state_store(state_store);
1611 if let Err(err) = state_store.persist() {
1612 warn!(error = ?err, "persist state reset failed");
1613 }
1614 anyhow::bail!("multiple ublk owners detected during adopt");
1615 }
1616
1617 if let Some(pid) = owner_pids.into_iter().next() {
1618 info!(pid, "signaling existing smoo-gadget owner for recovery");
1619 unsafe {
1620 libc::kill(pid, libc::SIGHUP);
1621 }
1622 info!(pid, "waiting for prior owner to exit before adopting");
1623 wait_for_owner_exit(ublk, &dev_ids, pid, Duration::from_secs(3)).await?;
1624 }
1625
1626 Ok(())
1627}
1628
1629async fn wait_for_owner_exit(
1630 ublk: &mut SmooUblk,
1631 dev_ids: &[u32],
1632 target_pid: i32,
1633 timeout: Duration,
1634) -> Result<()> {
1635 let deadline = Instant::now() + timeout;
1636 loop {
1637 let mut still_owned = false;
1638 for dev_id in dev_ids {
1639 match ublk.owner_pid(*dev_id).await {
1640 Ok(pid) => {
1641 debug!(dev_id, pid, target_pid, "owner check during adopt wait");
1642 if pid == target_pid {
1643 still_owned = true;
1644 } else if pid > 0 && pid != target_pid {
1645 anyhow::bail!(
1646 "device {dev_id} now owned by unexpected pid {pid} during adopt"
1647 );
1648 }
1649 }
1650 Err(err) => {
1651 warn!(dev_id, error = ?err, "owner pid query failed during adopt wait");
1652 }
1653 }
1654 }
1655 if !still_owned {
1656 return Ok(());
1657 }
1658 if Instant::now() >= deadline {
1659 anyhow::bail!("owner pid {target_pid} still active after adopt wait");
1660 }
1661 tokio::time::sleep(Duration::from_millis(100)).await;
1662 }
1663}
1664
1665fn reset_state_store(state_store: &mut StateStore) {
1666 let path = state_store.path().map(Path::to_path_buf);
1667 *state_store = match path {
1668 Some(path) => StateStore::new_with_path(path),
1669 None => StateStore::new(),
1670 };
1671}
1672
1673fn count_active_exports(exports: &HashMap<u32, ExportController>) -> u32 {
1674 exports
1675 .values()
1676 .filter(|ctrl| ctrl.is_active_for_status())
1677 .count() as u32
1678}
1679
1680async fn apply_config(
1681 ublk: &mut SmooUblk,
1682 runtime: &mut RuntimeState,
1683 config: ConfigExportsV0,
1684) -> Result<()> {
1685 let entries = config.entries();
1686 let desired_records = if entries.is_empty() {
1687 Vec::new()
1688 } else {
1689 config_entries_to_records(entries)?
1690 };
1691
1692 if desired_records.is_empty() {
1694 for controller in runtime.exports.values_mut() {
1695 if let Some((ctrl, queues)) = controller.take_device_handles() {
1696 ublk.stop_dev(SmooUblkDevice::from_parts(ctrl, queues), true)
1697 .await
1698 .context("stop ublk device before applying CONFIG_EXPORTS")?;
1699 }
1700 }
1701 runtime.exports.clear();
1702 runtime.reconcile_queue.clear();
1703 runtime.state_store().replace_all(Vec::new());
1704 if let Err(err) = runtime.state_store().persist() {
1705 warn!(error = ?err, "failed to clear state file");
1706 }
1707 runtime.status().set_export_count(0).await;
1708 return Ok(());
1709 }
1710
1711 let desired_specs: HashMap<u32, ExportSpec> = desired_records
1712 .iter()
1713 .map(|record| (record.export_id, record.spec.clone()))
1714 .collect();
1715
1716 let mut to_remove = Vec::new();
1718 for (export_id, controller) in runtime.exports.iter() {
1719 match desired_specs.get(export_id) {
1720 Some(spec) if spec == &controller.spec => {}
1721 _ => to_remove.push(*export_id),
1722 }
1723 }
1724 for export_id in to_remove {
1725 if let Some(mut controller) = runtime.exports.remove(&export_id) {
1726 if let Some((ctrl, queues)) = controller.take_device_handles() {
1727 ublk.stop_dev(SmooUblkDevice::from_parts(ctrl, queues), true)
1728 .await
1729 .with_context(|| format!("stop ublk device for export {export_id}"))?;
1730 }
1731 }
1732 }
1733
1734 for record in &desired_records {
1736 runtime.exports.entry(record.export_id).or_insert_with(|| {
1737 ExportController::new(record.export_id, record.spec.clone(), ExportState::New)
1738 });
1739 }
1740
1741 let mut new_records = Vec::with_capacity(desired_records.len());
1744 for mut record in desired_records {
1745 if let Some(ctrl) = runtime.exports.get(&record.export_id) {
1746 record.assigned_dev_id = ctrl.dev_id();
1747 }
1748 new_records.push(record);
1749 }
1750
1751 runtime.state_store().replace_all(new_records);
1752 if let Err(err) = runtime.state_store().persist() {
1753 warn!(error = ?err, "failed to write state store");
1754 }
1755 runtime
1756 .reconcile_queue
1757 .retain(|export_id| runtime.exports.contains_key(export_id));
1758 runtime
1759 .status()
1760 .set_export_count(count_active_exports(&runtime.exports))
1761 .await;
1762 Ok(())
1763}
1764
1765struct GadgetGuard {
1766 _registration: RegGadget,
1767}
1768
1769fn setup_configfs(
1770 args: &Args,
1771) -> Result<(Custom, FunctionfsEndpoints, Option<GadgetGuard>, PathBuf)> {
1772 if let Some(ffs_dir) = args.ffs_dir.as_ref() {
1773 info!(
1774 ffs_dir = %ffs_dir.display(),
1775 "using existing FunctionFS directory; skipping configfs setup"
1776 );
1777 let custom = configfs_builder(args)
1778 .existing(ffs_dir)
1779 .context("initialize FunctionFS in existing directory")?;
1780 let endpoints = open_data_endpoints(ffs_dir)?;
1781 return Ok((custom, endpoints, None, ffs_dir.clone()));
1782 }
1783
1784 usb_gadget::remove_all().context("remove existing USB gadgets")?;
1785 let (mut custom, handle) = configfs_builder(args).build();
1786
1787 let (subclass, protocol) = interface_identity(args);
1788 let klass = Class::new(SMOO_CLASS, subclass, protocol);
1789 let id = Id::new(args.vendor_id, args.product_id);
1790 let strings = Strings::new("smoo", "smoo gadget", "0001");
1791 let udc = usb_gadget::default_udc().context("locate UDC")?;
1792 let gadget =
1793 Gadget::new(klass, id, strings).with_config(Config::new("config").with_function(handle));
1794 let reg = gadget.register().context("register gadget")?;
1795
1796 let ffs_dir = custom.ffs_dir().context("resolve FunctionFS dir")?;
1797 reg.bind(Some(&udc)).context("bind gadget to UDC")?;
1798
1799 let endpoints = open_data_endpoints(&ffs_dir)?;
1800
1801 Ok((
1802 custom,
1803 endpoints,
1804 Some(GadgetGuard { _registration: reg }),
1805 ffs_dir,
1806 ))
1807}
1808
1809fn configfs_builder(args: &Args) -> CustomBuilder {
1810 let (subclass, protocol) = interface_identity(args);
1811 Custom::builder().with_interface(
1812 Interface::new(Class::vendor_specific(subclass, protocol), "smoo")
1813 .with_endpoint(interrupt_in_ep())
1814 .with_endpoint(interrupt_out_ep())
1815 .with_endpoint(bulk_in_ep())
1816 .with_endpoint(bulk_out_ep()),
1817 )
1818}
1819
1820fn interface_identity(args: &Args) -> (u8, u8) {
1821 if args.mimic_fastboot {
1822 (FASTBOOT_SUBCLASS, FASTBOOT_PROTOCOL)
1823 } else {
1824 (SMOO_SUBCLASS, SMOO_PROTOCOL)
1825 }
1826}
1827
1828fn open_data_endpoints(ffs_dir: &Path) -> Result<FunctionfsEndpoints> {
1829 let interrupt_in = open_endpoint_fd(ffs_dir.join("ep1")).context("open interrupt IN")?;
1830 let interrupt_out = open_endpoint_fd(ffs_dir.join("ep2")).context("open interrupt OUT")?;
1831 let bulk_in = open_endpoint_fd(ffs_dir.join("ep3")).context("open bulk IN")?;
1832 let bulk_out = open_endpoint_fd(ffs_dir.join("ep4")).context("open bulk OUT")?;
1833 Ok(FunctionfsEndpoints::new(
1834 interrupt_in,
1835 interrupt_out,
1836 bulk_in,
1837 bulk_out,
1838 ))
1839}
1840
1841fn interrupt_in_ep() -> Endpoint {
1842 let (_, dir) = EndpointDirection::device_to_host();
1843 make_ep(dir, TransferType::Interrupt, 1024)
1844}
1845
1846fn interrupt_out_ep() -> Endpoint {
1847 let (_, dir) = EndpointDirection::host_to_device();
1848 make_ep(dir, TransferType::Interrupt, 1024)
1849}
1850
1851fn bulk_in_ep() -> Endpoint {
1852 let (_, dir) = EndpointDirection::device_to_host();
1853 make_ep(dir, TransferType::Bulk, 512)
1854}
1855
1856fn bulk_out_ep() -> Endpoint {
1857 let (_, dir) = EndpointDirection::host_to_device();
1858 make_ep(dir, TransferType::Bulk, 512)
1859}
1860
1861fn make_ep(direction: EndpointDirection, ty: TransferType, packet_size: u16) -> Endpoint {
1862 let mut ep = match ty {
1863 TransferType::Bulk => Endpoint::bulk(direction),
1864 _ => Endpoint::custom(direction, ty),
1865 };
1866 ep.max_packet_size_hs = packet_size;
1867 ep.max_packet_size_ss = packet_size;
1868 if matches!(ty, TransferType::Interrupt) {
1869 ep.interval = 1;
1870 }
1871 ep
1872}
1873
1874fn open_endpoint_fd(path: PathBuf) -> Result<OwnedFd> {
1875 let file = File::options()
1876 .read(true)
1877 .write(true)
1878 .open(&path)
1879 .with_context(|| format!("open {}", path.display()))?;
1880 Ok(to_owned_fd(file))
1881}
1882
1883fn to_owned_fd(file: File) -> OwnedFd {
1884 let raw = file.into_raw_fd();
1885 unsafe { OwnedFd::from_raw_fd(raw) }
1886}
1887
1888async fn cleanup_ublk_devices(
1889 ublk: &mut SmooUblk,
1890 runtime: &mut RuntimeState,
1891 shutdown_reason: Option<&str>,
1892 forceful: bool,
1893) -> Result<()> {
1894 for (_, tasks) in runtime.queue_tasks.drain() {
1895 if forceful {
1896 tasks.abort();
1897 } else {
1898 tasks.shutdown().await;
1899 }
1900 }
1901 let mut force_remove_ids = Vec::new();
1902 for controller in runtime.exports.values_mut() {
1903 if let Some((ctrl, queues)) = controller.take_device_handles() {
1904 let dev_id = ctrl.dev_id();
1905 if forceful {
1906 info!(
1907 dev_id,
1908 shutdown_reason = shutdown_reason.unwrap_or("unspecified"),
1909 "forceful shutdown: dropping ublk device handles"
1910 );
1911 drop(SmooUblkDevice::from_parts(ctrl, queues));
1912 force_remove_ids.push(dev_id);
1913 } else {
1914 info!(
1915 dev_id,
1916 shutdown_reason = shutdown_reason.unwrap_or("unspecified"),
1917 "stopping ublk device"
1918 );
1919 if let Err(err) = ublk
1920 .stop_dev(SmooUblkDevice::from_parts(ctrl, queues), true)
1921 .await
1922 {
1923 warn!(
1924 dev_id,
1925 error = ?err,
1926 "graceful stop failed; will force-remove"
1927 );
1928 force_remove_ids.push(dev_id);
1929 }
1930 }
1931 } else if let Some(dev_id) = controller.dev_id() {
1932 force_remove_ids.push(dev_id);
1933 }
1934 }
1935
1936 for dev_id in force_remove_ids {
1937 force_remove_with_retry(ublk, dev_id).await?;
1938 }
1939 Ok(())
1940}
1941
1942async fn force_remove_with_retry(ublk: &mut SmooUblk, dev_id: u32) -> Result<()> {
1943 let mut attempt: u32 = 0;
1944 loop {
1945 attempt = attempt.wrapping_add(1);
1946 match ublk.force_remove_device(dev_id).await {
1947 Ok(()) => {
1948 info!(dev_id, attempt, "force-removed ublk device");
1949 break;
1950 }
1951 Err(err) => {
1952 if error_is_errno(&err, libc::ENOENT) {
1953 info!(dev_id, attempt, "ublk device already absent");
1954 break;
1955 }
1956 warn!(
1957 dev_id,
1958 attempt,
1959 error = ?err,
1960 "force-remove ublk device failed; retrying"
1961 );
1962 }
1963 }
1964 tokio::time::sleep(Duration::from_millis(250)).await;
1965 }
1966 Ok(())
1967}
1968
1969async fn begin_user_recovery(ublk: &mut SmooUblk, runtime: &mut RuntimeState) -> Result<()> {
1970 ublk.preserve_devices_on_drop();
1971 for (_, tasks) in runtime.queue_tasks.drain() {
1972 tasks.shutdown().await;
1973 }
1974 let mut dev_ids = Vec::new();
1975 for ctrl in runtime.exports.values_mut() {
1976 if let Some((ctrl, queues)) = ctrl.take_device_handles() {
1977 dev_ids.push(ctrl.dev_id());
1978 drop(SmooUblkDevice::from_parts(ctrl, queues));
1979 } else if let Some(dev_id) = ctrl.dev_id() {
1980 dev_ids.push(dev_id);
1981 }
1982 }
1983 for dev_id in dev_ids {
1984 if let Err(err) = ublk.start_user_recovery(dev_id).await {
1985 warn!(dev_id, error = ?err, "start user recovery failed");
1986 }
1987 }
1988 Ok(())
1989}
1990
1991async fn process_link_commands(
1992 runtime: &mut RuntimeState,
1993 link: &mut LinkController,
1994) -> Result<()> {
1995 if let Some(LinkCommand::Fatal) = link.take_command() {
1996 let reason = link.last_offline_reason();
1997 let active_exports = count_active_exports(&runtime.exports);
1998 warn!(
1999 ?reason,
2000 state = ?link.state(),
2001 active_exports,
2002 "link controller emitted fatal command; keeping ublk runtime alive"
2003 );
2004 }
2005 Ok(())
2006}
2007
2008async fn handle_data_plane_event(
2009 runtime: &mut RuntimeState,
2010 link: &mut LinkController,
2011 event: DataPlaneEvent,
2012) -> Result<()> {
2013 match event {
2014 DataPlaneEvent::IoError { epoch, error } => {
2015 if epoch != runtime.data_plane_epoch {
2016 trace!(
2017 event_epoch = epoch,
2018 current_epoch = runtime.data_plane_epoch,
2019 "ignoring stale data plane error"
2020 );
2021 return Ok(());
2022 }
2023 warn!(
2024 error = ?error,
2025 event_epoch = epoch,
2026 current_epoch = runtime.data_plane_epoch,
2027 "data plane I/O error; requesting link offline"
2028 );
2029 link.on_io_error(&error);
2030 }
2031 }
2032 process_link_commands(runtime, link).await?;
2033 Ok(())
2034}
2035
2036async fn handle_queue_event(
2037 runtime: &mut RuntimeState,
2038 link: &mut LinkController,
2039 outstanding: &mut HashMap<u32, HashMap<(u16, u16), OutstandingRequest>>,
2040 _data_plane_tx: &mpsc::UnboundedSender<DataPlaneEvent>,
2041 event: QueueEvent,
2042) -> Result<()> {
2043 match event {
2044 QueueEvent::Request {
2045 export_id,
2046 dev_id,
2047 request,
2048 queues,
2049 } => {
2050 let Some(ctrl) = runtime.exports.get_mut(&export_id) else {
2051 return Ok(());
2052 };
2053 let Some(handle) = ctrl.device_handle() else {
2054 park_request(outstanding, export_id, dev_id, queues.clone(), request);
2055 return Ok(());
2056 };
2057 if handle.dev_id() != dev_id {
2058 trace!(export_id, dev_id, "dropping request for stale device id");
2059 return Ok(());
2060 }
2061 let handle_ready = matches!(
2062 handle,
2063 DeviceHandle::Online { .. } | DeviceHandle::Starting { .. }
2064 );
2065 if !matches!(link.state(), LinkState::Online)
2066 || runtime.gadget.is_none()
2067 || !handle_ready
2068 {
2069 trace!(
2070 export_id,
2071 queue = request.queue_id,
2072 tag = request.tag,
2073 "link not online; parking IO"
2074 );
2075 park_request(outstanding, export_id, dev_id, queues.clone(), request);
2076 return Ok(());
2077 }
2078 let Some(pump) = runtime.io_pump.as_ref() else {
2079 park_request(outstanding, export_id, dev_id, queues.clone(), request);
2080 return Ok(());
2081 };
2082 trace!(
2083 export_id,
2084 dev_id,
2085 queue = request.queue_id,
2086 tag = request.tag,
2087 op = ?request.op,
2088 sector = request.sector,
2089 num_sectors = request.num_sectors,
2090 "dispatch ublk request to host"
2091 );
2092 if let Err(err) = handle_request(pump.clone(), export_id, queues.clone(), request).await
2093 {
2094 let io_err = io_error_from_anyhow(&err);
2095 link.on_io_error(&io_err);
2096 park_request(outstanding, export_id, dev_id, queues.clone(), request);
2097 warn!(
2098 export_id,
2099 dev_id,
2100 queue = request.queue_id,
2101 tag = request.tag,
2102 io_kind = ?io_err.kind(),
2103 io_errno = io_err.raw_os_error(),
2104 error = ?err,
2105 "request dispatch failed; parked and forcing link offline"
2106 );
2107 }
2108 }
2109 QueueEvent::QueueError {
2110 export_id,
2111 dev_id,
2112 error,
2113 } => {
2114 warn!(
2115 export_id,
2116 dev_id,
2117 error = ?error,
2118 "queue task error; marking export failed and forcing link offline"
2119 );
2120 if let Some(ctrl) = runtime.exports.get_mut(&export_id) {
2121 ctrl.fail_device(format!("device {dev_id} queue task error: {error:#}"));
2122 }
2123 if let Some(mut pending) = outstanding.remove(&export_id) {
2124 for ((_queue_id, _tag), req) in pending.drain() {
2125 let _ = req.queues.complete_io(req.request, -libc::ENOLINK);
2126 }
2127 }
2128 link.on_io_error(&io::Error::other("queue task error"));
2129 }
2130 }
2131 Ok(())
2132}
2133
2134fn park_request(
2135 outstanding: &mut HashMap<u32, HashMap<(u16, u16), OutstandingRequest>>,
2136 export_id: u32,
2137 dev_id: u32,
2138 queues: Arc<UblkQueueRuntime>,
2139 req: UblkIoRequest,
2140) {
2141 let entry = outstanding.entry(export_id).or_default();
2142 entry.insert(
2143 (req.queue_id, req.tag),
2144 OutstandingRequest {
2145 dev_id,
2146 request: req,
2147 queues,
2148 },
2149 );
2150}
2151
2152fn prune_outstanding_for_missing_exports(
2153 outstanding: &mut HashMap<u32, HashMap<(u16, u16), OutstandingRequest>>,
2154 exports: &HashMap<u32, ExportController>,
2155) {
2156 let mut to_fail = Vec::new();
2157 for export_id in outstanding.keys() {
2158 if !exports.contains_key(export_id) {
2159 to_fail.push(*export_id);
2160 }
2161 }
2162 for export_id in to_fail {
2163 if let Some(mut pending) = outstanding.remove(&export_id) {
2164 for ((_queue_id, _tag), req) in pending.drain() {
2165 let _ = req.queues.complete_io(req.request, -libc::ENODEV);
2166 }
2167 }
2168 }
2169}
2170
2171fn io_error_from_anyhow(err: &anyhow::Error) -> io::Error {
2172 if let Some(cause) = err
2173 .chain()
2174 .find_map(|cause| cause.downcast_ref::<io::Error>())
2175 {
2176 io::Error::new(cause.kind(), cause.to_string())
2177 } else {
2178 io::Error::other(err.to_string())
2179 }
2180}
2181
2182fn note_exit_reason(exit_reason: &mut Option<String>, reason: impl Into<String>) {
2183 if exit_reason.is_none() {
2184 *exit_reason = Some(reason.into());
2185 }
2186}
2187
2188fn error_is_errno(err: &anyhow::Error, code: i32) -> bool {
2189 err.chain()
2190 .find_map(|cause| cause.downcast_ref::<std::io::Error>())
2191 .and_then(|io_err| io_err.raw_os_error())
2192 == Some(code)
2193}
2194
2195fn error_is_missing(err: &anyhow::Error) -> bool {
2196 err.chain()
2197 .find_map(|cause| cause.downcast_ref::<std::io::Error>())
2198 .and_then(|io_err| io_err.raw_os_error())
2199 .is_some_and(|code| code == libc::ENOENT || code == libc::EINVAL)
2200}
2201
2202fn pid_is_alive(pid: i32) -> bool {
2203 if pid <= 0 {
2204 return false;
2205 }
2206 let res = unsafe { libc::kill(pid, 0) };
2207 if res == 0 {
2208 return true;
2209 }
2210 let err = std::io::Error::last_os_error();
2211 !matches!(err.raw_os_error(), Some(libc::ESRCH))
2212}
2213
2214fn parse_hex_u16(input: &str) -> Result<u16, String> {
2215 let trimmed = input.trim_start_matches("0x").trim_start_matches("0X");
2216 u16::from_str_radix(trimmed, 16).map_err(|err| err.to_string())
2217}
2218
2219fn validate_persisted_record(record: &PersistedExportRecord) -> Result<()> {
2220 ensure!(
2221 record.export_id != 0,
2222 "persisted export_id must be non-zero"
2223 );
2224 let block_size = record.spec.block_size;
2225 ensure!(
2226 block_size.is_power_of_two(),
2227 "persisted block size must be power-of-two"
2228 );
2229 ensure!(
2230 (512..=65536).contains(&block_size),
2231 "persisted block size out of range"
2232 );
2233 ensure!(
2234 record.spec.size_bytes != 0,
2235 "persisted export size_bytes must be non-zero"
2236 );
2237 ensure!(
2238 record.spec.size_bytes.is_multiple_of(block_size as u64),
2239 "persisted export size_bytes must be multiple of block_size"
2240 );
2241 let blocks = record
2242 .spec
2243 .size_bytes
2244 .checked_div(block_size as u64)
2245 .context("persisted size_bytes smaller than block_size")?;
2246 ensure!(blocks > 0, "persisted export size too small");
2247 usize::try_from(blocks).context("persisted export block count overflows usize")?;
2248 Ok(())
2249}
2250
2251fn config_entries_to_records(entries: &[ConfigExport]) -> Result<Vec<PersistedExportRecord>> {
2252 let mut seen = HashSet::new();
2253 let mut records = Vec::with_capacity(entries.len());
2254 for export in entries {
2255 ensure!(
2256 seen.insert(export.export_id),
2257 "duplicate export_id {} in CONFIG_EXPORTS",
2258 export.export_id
2259 );
2260 let spec = build_spec_from_export(*export)?;
2261 records.push(PersistedExportRecord {
2262 export_id: export.export_id,
2263 spec,
2264 assigned_dev_id: None,
2265 });
2266 }
2267 Ok(records)
2268}
2269
2270fn build_spec_from_export(export: ConfigExport) -> Result<ExportSpec> {
2271 let block_size = export.block_size as usize;
2272 ensure!(
2273 export.size_bytes != 0,
2274 "CONFIG_EXPORTS size_bytes must be non-zero"
2275 );
2276 ensure!(
2277 export.size_bytes.is_multiple_of(block_size as u64),
2278 "CONFIG_EXPORTS size_bytes must be multiple of block_size"
2279 );
2280 let blocks = export
2281 .size_bytes
2282 .checked_div(block_size as u64)
2283 .context("size bytes smaller than block size")?;
2284 ensure!(blocks > 0, "export size too small");
2285 usize::try_from(blocks).context("export block count overflows usize")?;
2286 Ok(ExportSpec {
2287 block_size: export.block_size,
2288 size_bytes: export.size_bytes,
2289 flags: ExportFlags::empty(),
2290 })
2291}