1use super::io::{UblkDev, UblkTgt};
2use super::uring_async::UblkUringOpFuture;
3use super::{sys, UblkError, UblkFlags};
4use bitmaps::Bitmap;
5use derive_setters::*;
6use io_uring::{opcode, squeue, types, IoUring};
7use log::{error, trace};
8use serde::Deserialize;
9use std::cell::RefCell;
10use std::os::unix::io::AsRawFd;
11use std::sync::{Arc, RwLock};
12use std::{
13 fs,
14 io::{Read, Write},
15 path::Path,
16};
17
18const CTRL_PATH: &str = "/dev/ublk-control";
19
20const MAX_BUF_SZ: u32 = 32_u32 << 20;
21
22std::thread_local! {
25 pub(crate) static CTRL_URING: RefCell<Option<IoUring::<squeue::Entry128>>> =
26 RefCell::new(None);
27}
28
29#[macro_export]
31macro_rules! with_ctrl_ring_internal {
32 ($closure:expr) => {
33 $crate::ctrl::CTRL_URING.with(|cell| {
34 let ring_ref = cell.borrow();
35 if let Some(ref ring) = *ring_ref {
36 $closure(ring)
37 } else {
38 panic!("Control ring not initialized. Call ublk_init_ctrl_task_ring() first or use UblkCtrl constructor.")
39 }
40 })
41 };
42}
43
44#[macro_export]
45macro_rules! with_ctrl_ring_mut_internal {
46 ($closure:expr) => {
47 $crate::ctrl::CTRL_URING.with(|cell| {
48 let mut ring_ref = cell.borrow_mut();
49 if let Some(ref mut ring) = *ring_ref {
50 $closure(ring)
51 } else {
52 panic!("Control ring not initialized. Call ublk_init_ctrl_task_ring() first or use UblkCtrl constructor.")
53 }
54 })
55 };
56}
57
58pub(crate) use with_ctrl_ring_internal;
60pub(crate) use with_ctrl_ring_mut_internal;
61
62pub fn with_ctrl_ring<T, F>(closure: F) -> T
97where
98 F: FnOnce(&IoUring<squeue::Entry128>) -> T,
99{
100 with_ctrl_ring_internal!(closure)
101}
102
103pub fn with_ctrl_ring_mut<T, F>(closure: F) -> T
139where
140 F: FnOnce(&mut IoUring<squeue::Entry128>) -> T,
141{
142 with_ctrl_ring_mut_internal!(closure)
143}
144
145pub fn ublk_init_ctrl_task_ring<F>(init_fn: F) -> Result<(), UblkError>
204where
205 F: FnOnce(&mut Option<IoUring<squeue::Entry128>>) -> Result<(), UblkError>,
206{
207 CTRL_URING.with(|cell| {
208 let mut ring_ref = cell.borrow_mut();
209 init_fn(&mut *ring_ref)
210 })
211}
212
213pub(crate) fn init_ctrl_task_ring_default(depth: u32) -> Result<(), UblkError> {
218 ublk_init_ctrl_task_ring(|ring_opt| {
219 if ring_opt.is_none() {
220 let ring = IoUring::<squeue::Entry128>::builder()
221 .build(depth)
222 .map_err(UblkError::IOError)?;
223 *ring_opt = Some(ring);
224 }
225 Ok(())
226 })
227}
228
229#[derive(Debug, Default, Copy, Clone)]
234pub struct UblkQueueAffinity {
235 affinity: Bitmap<1024>,
236}
237
238impl UblkQueueAffinity {
239 pub fn new() -> UblkQueueAffinity {
240 UblkQueueAffinity {
241 affinity: Bitmap::new(),
242 }
243 }
244
245 pub fn buf_len(&self) -> usize {
246 1024 / 8
247 }
248
249 pub fn addr(&self) -> *const u8 {
250 self.affinity.as_bytes().as_ptr()
251 }
252
253 fn addr_mut(&mut self) -> *mut u8 {
254 self.affinity.as_bytes().as_ptr() as *mut u8
255 }
256
257 pub fn to_bits_vec(&self) -> Vec<usize> {
258 self.affinity.into_iter().collect()
259 }
260
261 fn get_random_cpu(&self) -> Option<usize> {
263 let cpus: Vec<usize> = self.affinity.into_iter().collect();
264 if cpus.is_empty() {
265 return None;
266 }
267
268 let mut seed = std::time::SystemTime::now()
270 .duration_since(std::time::UNIX_EPOCH)
271 .unwrap_or_default()
272 .as_nanos() as usize;
273
274 unsafe {
275 seed = seed.wrapping_add(libc::gettid() as usize);
276 }
277
278 Some(cpus[seed % cpus.len()])
279 }
280
281 pub fn from_single_cpu(cpu: usize) -> UblkQueueAffinity {
283 let mut affinity = UblkQueueAffinity::new();
284 affinity.affinity.set(cpu, true);
285 affinity
286 }
287
288 pub fn set_cpu(&mut self, cpu: usize) {
290 self.affinity.set(cpu, true);
291 }
292
293 pub fn set_only_cpu(&mut self, cpu: usize) {
295 self.affinity = Bitmap::new();
296 self.affinity.set(cpu, true);
297 }
298
299 pub fn is_empty(&self) -> bool {
301 self.to_bits_vec().is_empty()
302 }
303
304 pub fn is_single_cpu(&self) -> bool {
306 self.to_bits_vec().len() == 1
307 }
308
309 pub fn get_first_cpu(&self) -> Option<usize> {
311 self.to_bits_vec().first().copied()
312 }
313}
314
315#[repr(C)]
316union CtrlCmd {
317 ctrl_cmd: sys::ublksrv_ctrl_cmd,
318 buf: [u8; 80],
319}
320
321const CTRL_UBLKC_PATH_MAX: usize = 32;
325const CTRL_CMD_HAS_DATA: u32 = 1;
326const CTRL_CMD_HAS_BUF: u32 = 2;
327const CTRL_CMD_BUF_READ: u32 = 8;
329const CTRL_CMD_NO_NEED_DEV_PATH: u32 = 16;
332
333#[derive(Debug, Default, Copy, Clone)]
334struct UblkCtrlCmdData {
335 cmd_op: u32,
336 flags: u32,
337 data: u64,
338 dev_path_len: u16,
339 _pad: u16,
340 _reserved: u32,
341
342 addr: u64,
343 len: u32,
344}
345
346impl UblkCtrlCmdData {
347 fn new_simple_cmd(cmd_op: u32) -> Self {
349 Self {
350 cmd_op,
351 ..Default::default()
352 }
353 }
354
355 fn new_data_cmd(cmd_op: u32, data: u64) -> Self {
357 Self {
358 cmd_op,
359 flags: CTRL_CMD_HAS_DATA,
360 data,
361 ..Default::default()
362 }
363 }
364
365 fn new_read_buffer_cmd(cmd_op: u32, addr: u64, len: u32, no_dev_path: bool) -> Self {
367 let mut flags = CTRL_CMD_HAS_BUF | CTRL_CMD_BUF_READ;
368 if no_dev_path {
369 flags |= CTRL_CMD_NO_NEED_DEV_PATH;
370 }
371 Self {
372 cmd_op,
373 flags,
374 addr,
375 len,
376 ..Default::default()
377 }
378 }
379
380 fn new_write_buffer_cmd(cmd_op: u32, addr: u64, len: u32, no_dev_path: bool) -> Self {
382 let mut flags = CTRL_CMD_HAS_BUF;
383 if no_dev_path {
384 flags |= CTRL_CMD_NO_NEED_DEV_PATH;
385 }
386 Self {
387 cmd_op,
388 flags,
389 addr,
390 len,
391 ..Default::default()
392 }
393 }
394
395 fn new_data_buffer_cmd(cmd_op: u32, data: u64, addr: u64, len: u32, read_buffer: bool) -> Self {
397 let mut flags = CTRL_CMD_HAS_BUF | CTRL_CMD_HAS_DATA;
398 if read_buffer {
399 flags |= CTRL_CMD_BUF_READ;
400 }
401 Self {
402 cmd_op,
403 flags,
404 data,
405 addr,
406 len,
407 ..Default::default()
408 }
409 }
410
411 fn prep_un_privileged_dev_path(&mut self, dev: &UblkCtrlInner) -> (u64, Option<Vec<u8>>) {
412 let cmd_op = self.cmd_op & 0xff;
414
415 if cmd_op != sys::UBLK_CMD_GET_DEV_INFO2
416 && (!dev.is_unprivileged() || (self.flags & CTRL_CMD_NO_NEED_DEV_PATH) != 0)
417 {
418 return (0, None);
419 }
420
421 let (buf, new_buf) = {
422 let size = {
423 if self.flags & CTRL_CMD_HAS_BUF != 0 {
424 self.len as usize + CTRL_UBLKC_PATH_MAX
425 } else {
426 CTRL_UBLKC_PATH_MAX
427 }
428 };
429 let mut v = vec![0_u8; size];
430
431 (v.as_mut_ptr(), v)
432 };
433
434 let path_str = dev.get_cdev_path().to_string();
435 assert!(path_str.len() <= CTRL_UBLKC_PATH_MAX);
436
437 unsafe {
438 libc::memset(buf as *mut libc::c_void, 0, CTRL_UBLKC_PATH_MAX);
439 libc::memcpy(
440 buf as *mut libc::c_void,
441 path_str.as_ptr() as *const libc::c_void,
442 path_str.len(),
443 );
444
445 if self.flags & CTRL_CMD_HAS_BUF != 0 {
446 libc::memcpy(
447 (buf as u64 + CTRL_UBLKC_PATH_MAX as u64) as *mut libc::c_void,
448 self.addr as *const libc::c_void,
449 self.len as usize,
450 );
451 }
452 }
453
454 self.flags |= CTRL_CMD_HAS_BUF | CTRL_CMD_HAS_DATA;
455 self.len += CTRL_UBLKC_PATH_MAX as u32;
456 self.dev_path_len = CTRL_UBLKC_PATH_MAX as u16;
457 let addr = self.addr;
458 self.addr = buf as u64;
459 (addr, Some(new_buf))
460 }
461
462 fn unprep_un_privileged_dev_path(&mut self, dev: &UblkCtrlInner, buf: u64) {
463 let cmd_op = self.cmd_op & 0xff;
464
465 if cmd_op != sys::UBLK_CMD_GET_DEV_INFO2
466 && (!dev.is_unprivileged() || (self.flags & CTRL_CMD_NO_NEED_DEV_PATH) != 0)
467 {
468 return;
469 }
470
471 let addr = self.addr + CTRL_UBLKC_PATH_MAX as u64;
472 let len = self.len - CTRL_UBLKC_PATH_MAX as u32;
473 if self.flags & CTRL_CMD_BUF_READ != 0 {
474 unsafe {
475 libc::memcpy(
476 buf as *mut libc::c_void,
477 addr as *const libc::c_void,
478 len as usize,
479 );
480 }
481 }
482 }
483}
484
485#[derive(Debug, Deserialize)]
486struct QueueAffinityJson {
487 affinity: Vec<u32>,
488 qid: u32,
489 tid: u32,
490}
491
492#[derive(Debug)]
498pub(crate) struct UblkJsonManager {
499 dev_id: u32,
501 json: serde_json::Value,
503}
504
505impl UblkJsonManager {
506 pub fn new(dev_id: u32) -> Self {
508 Self {
509 dev_id,
510 json: serde_json::json!({}),
511 }
512 }
513
514 pub fn get_json_path(&self) -> String {
516 format!("{}/{:04}.json", UblkCtrl::run_dir(), self.dev_id)
517 }
518
519 pub fn get_json(&self) -> &serde_json::Value {
521 &self.json
522 }
523
524 pub fn get_json_mut(&mut self) -> &mut serde_json::Value {
526 &mut self.json
527 }
528
529 fn set_path_permission(path: &Path, mode: u32) -> Result<(), std::io::Error> {
531 use std::os::unix::fs::PermissionsExt;
532 let permissions = std::fs::Permissions::from_mode(mode);
533 std::fs::set_permissions(path, permissions)
534 }
535
536 pub fn flush_json(&mut self) -> Result<i32, UblkError> {
538 if self.json == serde_json::json!({}) {
539 return Ok(0);
540 }
541
542 let run_path = self.get_json_path();
544
545 let json_path = Path::new(&run_path);
546
547 if let Some(parent_dir) = json_path.parent() {
548 if !parent_dir.exists() {
549 std::fs::create_dir_all(parent_dir)?;
550 Self::set_path_permission(parent_dir, 0o777)?;
552 }
553 }
554
555 let mut run_file = fs::File::create(json_path)?;
556
557 Self::set_path_permission(json_path, 0o700)?;
559
560 run_file.write_all(self.json.to_string().as_bytes())?;
561 Ok(0)
562 }
563
564 pub fn reload_json(&mut self) -> Result<i32, UblkError> {
566 let mut file = fs::File::open(self.get_json_path())?;
567 let mut json_str = String::new();
568 file.read_to_string(&mut json_str)?;
569 self.json = serde_json::from_str(&json_str).map_err(UblkError::JsonError)?;
570 Ok(0)
571 }
572
573 pub fn set_json(&mut self, json: serde_json::Value) {
575 self.json = json;
576 }
577
578 pub fn update_dev_id(&mut self, new_dev_id: u32) {
580 self.dev_id = new_dev_id;
581 }
582
583 pub fn dump_json(&self) {
585 if !Path::new(&self.get_json_path()).exists() {
586 return;
587 }
588
589 let Ok(mut file) = fs::File::open(self.get_json_path()) else {
590 eprintln!("Warning: Failed to open JSON file for dumping");
591 return;
592 };
593
594 let mut json_str = String::new();
595 if file.read_to_string(&mut json_str).is_err() {
596 eprintln!("Warning: Failed to read JSON file content");
597 return;
598 }
599
600 let Ok(json_value): Result<serde_json::Value, _> = serde_json::from_str(&json_str) else {
601 eprintln!("Warning: Failed to parse JSON content");
602 return;
603 };
604
605 let queues = &json_value["queues"];
606
607 for i in 0..16 {
608 if let Some(queue) = queues.get(&i.to_string()) {
610 let this_queue: Result<QueueAffinityJson, _> =
611 serde_json::from_value(queue.clone());
612
613 if let Ok(p) = this_queue {
614 println!(
615 "\tqueue {} tid: {} affinity({})",
616 p.qid,
617 p.tid,
618 p.affinity
619 .iter()
620 .map(ToString::to_string)
621 .collect::<Vec<String>>()
622 .join(" ")
623 );
624 }
625 }
626 }
627 let tgt_val = &json_value["target"];
628 let tgt: Result<UblkTgt, _> = serde_json::from_value(tgt_val.clone());
629 if let Ok(p) = tgt {
630 println!(
631 "\ttarget {{\"dev_size\":{},\"name\":\"{}\",\"type\":0}}",
632 p.dev_size, p.tgt_type
633 );
634 }
635 println!("\ttarget_data {}", &json_value["target_data"]);
636 }
637
638 pub fn get_queue_tid_from_json(&self, qid: u16) -> Result<i32, UblkError> {
640 let queues = &self.json["queues"];
641 let queue = &queues[qid.to_string()];
642 let this_queue: Result<QueueAffinityJson, _> = serde_json::from_value(queue.clone());
643
644 if let Ok(p) = this_queue {
645 Ok(p.tid as i32)
646 } else {
647 Err(UblkError::OtherError(-libc::EEXIST))
648 }
649 }
650
651 pub fn get_target_flags_from_json(&self) -> Result<u32, UblkError> {
653 let __tgt_flags = &self.json["target_flags"];
654 let tgt_flags: Result<u32, _> = serde_json::from_value(__tgt_flags.clone());
655 if let Ok(t) = tgt_flags {
656 Ok(t)
657 } else {
658 Err(UblkError::OtherError(-libc::EINVAL))
659 }
660 }
661
662 pub fn get_target_from_json(&self) -> Result<super::io::UblkTgt, UblkError> {
664 let tgt_val = &self.json["target"];
665 let tgt: Result<super::io::UblkTgt, _> = serde_json::from_value(tgt_val.clone());
666 if let Ok(t) = tgt {
667 Ok(t)
668 } else {
669 Err(UblkError::OtherError(-libc::EINVAL))
670 }
671 }
672
673 pub fn get_target_data_from_json(&self) -> Option<serde_json::Value> {
675 let val = &self.json["target_data"];
676 if !val.is_null() {
677 Some(val.clone())
678 } else {
679 None
680 }
681 }
682
683 pub fn get_target_type_from_json(&self) -> Result<String, UblkError> {
685 if let Ok(tgt) = self.get_target_from_json() {
686 Ok(tgt.tgt_type)
687 } else {
688 Err(UblkError::OtherError(-libc::EINVAL))
689 }
690 }
691}
692
693#[derive(Debug, Clone)]
695struct UblkCtrlConfig {
696 name: Option<String>,
697 id: i32,
698 nr_queues: u32,
699 depth: u32,
700 io_buf_bytes: u32,
701 flags: u64,
702 tgt_flags: u64,
703 dev_flags: UblkFlags,
704}
705
706impl UblkCtrlConfig {
707 fn new(
708 name: Option<String>,
709 id: i32,
710 nr_queues: u32,
711 depth: u32,
712 io_buf_bytes: u32,
713 flags: u64,
714 tgt_flags: u64,
715 dev_flags: UblkFlags,
716 ) -> Self {
717 Self {
718 name,
719 id,
720 nr_queues,
721 depth,
722 io_buf_bytes,
723 flags,
724 tgt_flags,
725 dev_flags,
726 }
727 }
728}
729
730#[derive(Setters, Debug, PartialEq, Eq)]
738pub struct UblkCtrlBuilder<'a> {
739 name: &'a str,
741
742 id: i32,
747
748 nr_queues: u16,
750
751 depth: u16,
753
754 io_buf_bytes: u32,
757
758 ctrl_flags: u64,
761
762 ctrl_target_flags: u64,
765
766 dev_flags: UblkFlags,
768}
769
770impl Default for UblkCtrlBuilder<'_> {
771 fn default() -> Self {
772 UblkCtrlBuilder {
773 name: "none",
774 id: -1,
775 nr_queues: 1,
776 depth: 64,
777 io_buf_bytes: 524288,
778 ctrl_flags: 0,
779 ctrl_target_flags: 0,
780 dev_flags: UblkFlags::empty(),
781 }
782 }
783}
784
785impl UblkCtrlBuilder<'_> {
786 pub fn build(self) -> Result<UblkCtrl, UblkError> {
789 UblkCtrl::new(
790 Some(self.name.to_string()),
791 self.id,
792 self.nr_queues.into(),
793 self.depth.into(),
794 self.io_buf_bytes,
795 self.ctrl_flags,
796 self.ctrl_target_flags,
797 self.dev_flags,
798 )
799 }
800 pub async fn build_async(self) -> Result<super::ctrl_async::UblkCtrlAsync, UblkError> {
801 super::ctrl_async::UblkCtrlAsync::new_async(
802 Some(self.name.to_string()),
803 self.id,
804 self.nr_queues.into(),
805 self.depth.into(),
806 self.io_buf_bytes,
807 self.ctrl_flags,
808 self.ctrl_target_flags,
809 self.dev_flags,
810 )
811 .await
812 }
813}
814
815pub struct UblkCtrl {
826 inner: RwLock<UblkCtrlInner>,
827}
828
829pub(crate) struct UblkCtrlInner {
830 pub(crate) name: Option<String>,
831 file: fs::File,
832 pub(crate) dev_info: sys::ublksrv_ctrl_dev_info,
833 pub(crate) json_manager: UblkJsonManager,
834 pub(crate) features: Option<u64>,
835
836 pub(crate) dev_flags: UblkFlags,
838 pub(crate) force_async: bool,
841
842 force_sync: bool,
844 cmd_token: i32,
845 queue_tids: Vec<i32>,
846 queue_selected_cpus: Vec<usize>,
847 pub(crate) nr_queues_configured: u16,
848}
849
850impl UblkCtrlInner {
852 pub(crate) const UBLK_CTRL_ASYNC_AWAIT: UblkFlags = UblkFlags::UBLK_DEV_F_INTERNAL_3;
855
856 async fn get_queue_affinity_effective_async(
857 &mut self,
858 qid: u16,
859 ) -> Result<UblkQueueAffinity, UblkError> {
860 let mut kernel_affinity = UblkQueueAffinity::new();
861 self.get_queue_affinity_async(qid as u32, &mut kernel_affinity)
862 .await?;
863
864 if self
865 .dev_flags
866 .contains(UblkFlags::UBLK_DEV_F_SINGLE_CPU_AFFINITY)
867 {
868 let selected_cpu = self.queue_selected_cpus[qid as usize];
870 Ok(UblkQueueAffinity::from_single_cpu(selected_cpu))
871 } else {
872 Ok(kernel_affinity)
873 }
874 }
875 fn get_queue_affinity_effective(&mut self, qid: u16) -> Result<UblkQueueAffinity, UblkError> {
877 let mut kernel_affinity = UblkQueueAffinity::new();
878 self.get_queue_affinity(qid as u32, &mut kernel_affinity)?;
879
880 if self
881 .dev_flags
882 .contains(UblkFlags::UBLK_DEV_F_SINGLE_CPU_AFFINITY)
883 {
884 let selected_cpu = self.queue_selected_cpus[qid as usize];
886 Ok(UblkQueueAffinity::from_single_cpu(selected_cpu))
887 } else {
888 Ok(kernel_affinity)
889 }
890 }
891
892 async fn select_single_cpu_for_queue_async(
894 &mut self,
895 qid: u16,
896 cpu: Option<usize>,
897 ) -> Result<usize, UblkError> {
898 let mut kernel_affinity = UblkQueueAffinity::new();
899 self.get_queue_affinity_async(qid as u32, &mut kernel_affinity)
900 .await?;
901
902 let selected_cpu = if let Some(cpu) = cpu {
903 let available_cpus = kernel_affinity.to_bits_vec();
905 if available_cpus.contains(&cpu) {
906 cpu
907 } else {
908 return Err(UblkError::OtherError(-libc::EINVAL));
909 }
910 } else {
911 kernel_affinity.get_random_cpu().unwrap_or(0)
913 };
914
915 if (qid as usize) < self.queue_selected_cpus.len() {
917 self.queue_selected_cpus[qid as usize] = selected_cpu;
918 Ok(selected_cpu)
919 } else {
920 Err(UblkError::OtherError(-libc::EINVAL))
921 }
922 }
923 fn select_single_cpu_for_queue(
925 &mut self,
926 qid: u16,
927 cpu: Option<usize>,
928 ) -> Result<usize, UblkError> {
929 let mut kernel_affinity = UblkQueueAffinity::new();
930 self.get_queue_affinity(qid as u32, &mut kernel_affinity)?;
931
932 let selected_cpu = if let Some(cpu) = cpu {
933 let available_cpus = kernel_affinity.to_bits_vec();
935 if available_cpus.contains(&cpu) {
936 cpu
937 } else {
938 return Err(UblkError::OtherError(-libc::EINVAL));
939 }
940 } else {
941 kernel_affinity.get_random_cpu().unwrap_or(0)
943 };
944
945 if (qid as usize) < self.queue_selected_cpus.len() {
947 self.queue_selected_cpus[qid as usize] = selected_cpu;
948 Ok(selected_cpu)
949 } else {
950 Err(UblkError::OtherError(-libc::EINVAL))
951 }
952 }
953
954 fn create_thread_affinity(&mut self, qid: u16) -> Result<UblkQueueAffinity, UblkError> {
956 if self
957 .dev_flags
958 .contains(UblkFlags::UBLK_DEV_F_SINGLE_CPU_AFFINITY)
959 {
960 let selected_cpu = self.select_single_cpu_for_queue(qid, None)?;
962 Ok(UblkQueueAffinity::from_single_cpu(selected_cpu))
963 } else {
964 let mut kernel_affinity = UblkQueueAffinity::new();
966 self.get_queue_affinity(qid as u32, &mut kernel_affinity)?;
967 Ok(kernel_affinity)
968 }
969 }
970
971 pub(crate) async fn create_thread_affinity_async(
973 &mut self,
974 qid: u16,
975 ) -> Result<UblkQueueAffinity, UblkError> {
976 if self
977 .dev_flags
978 .contains(UblkFlags::UBLK_DEV_F_SINGLE_CPU_AFFINITY)
979 {
980 let selected_cpu = self.select_single_cpu_for_queue_async(qid, None).await?;
982 Ok(UblkQueueAffinity::from_single_cpu(selected_cpu))
983 } else {
984 let mut kernel_affinity = UblkQueueAffinity::new();
986 self.get_queue_affinity_async(qid as u32, &mut kernel_affinity)
987 .await?;
988 Ok(kernel_affinity)
989 }
990 }
991}
992
993impl Drop for UblkCtrlInner {
994 fn drop(&mut self) {
995 let id = self.dev_info.dev_id;
996 trace!("ctrl: device {} dropped", id);
997 if self.for_add_dev() {
998 self.force_sync = true;
999 if let Err(r) = self.del() {
1000 trace!("Delete char device {} failed {}", self.dev_info.dev_id, r);
1002 }
1003 }
1004 }
1005}
1006
1007impl UblkCtrlInner {
1008 const CDEV_PATH: &'static str = "/dev/ublkc";
1011 pub(crate) const BDEV_PATH: &'static str = "/dev/ublkb";
1012
1013 const UBLK_CTRL_DEV_DELETED: UblkFlags = UblkFlags::UBLK_DEV_F_INTERNAL_2;
1014 const UBLK_DRV_F_ALL: u64 = (sys::UBLK_F_SUPPORT_ZERO_COPY
1015 | sys::UBLK_F_URING_CMD_COMP_IN_TASK
1016 | sys::UBLK_F_NEED_GET_DATA
1017 | sys::UBLK_F_USER_RECOVERY
1018 | sys::UBLK_F_USER_RECOVERY_REISSUE
1019 | sys::UBLK_F_UNPRIVILEGED_DEV
1020 | sys::UBLK_F_CMD_IOCTL_ENCODE
1021 | sys::UBLK_F_USER_COPY
1022 | sys::UBLK_F_ZONED
1023 | sys::UBLK_F_AUTO_BUF_REG) as u64;
1024
1025 fn create_device_info(
1027 id: i32,
1028 nr_queues: u32,
1029 depth: u32,
1030 io_buf_bytes: u32,
1031 flags: u64,
1032 tgt_flags: u64,
1033 ) -> sys::ublksrv_ctrl_dev_info {
1034 sys::ublksrv_ctrl_dev_info {
1035 nr_hw_queues: nr_queues as u16,
1036 queue_depth: depth as u16,
1037 max_io_buf_bytes: io_buf_bytes,
1038 dev_id: id as u32,
1039 ublksrv_pid: unsafe { libc::getpid() } as i32,
1040 flags,
1041 ublksrv_flags: tgt_flags,
1042 ..Default::default()
1043 }
1044 }
1045
1046 fn open_control_device() -> Result<fs::File, UblkError> {
1048 fs::OpenOptions::new()
1049 .read(true)
1050 .write(true)
1051 .open(CTRL_PATH)
1052 .map_err(UblkError::from)
1053 }
1054
1055 fn init_queue_data(nr_queues: u32) -> (Vec<i32>, Vec<usize>) {
1057 let queue_tids = {
1058 let mut tids = Vec::<i32>::with_capacity(nr_queues as usize);
1059 unsafe {
1060 tids.set_len(nr_queues as usize);
1061 }
1062 tids
1063 };
1064 let queue_selected_cpus = vec![0; nr_queues as usize];
1065 (queue_tids, queue_selected_cpus)
1066 }
1067
1068 fn handle_device_lifecycle(&mut self, id: i32) -> Result<(), UblkError> {
1070 if self.for_add_dev() {
1071 self.add()?;
1072 self.json_manager.update_dev_id(self.dev_info.dev_id);
1074 } else if id >= 0 {
1075 if let Err(_) = self.reload_json() {
1076 eprintln!("device reload json failed");
1077 }
1078 self.read_dev_info()?;
1079 }
1080 Ok(())
1081 }
1082
1083 async fn handle_device_lifecycle_async(&mut self, id: i32) -> Result<(), UblkError> {
1085 if self.for_add_dev() {
1086 self.add_async().await?;
1087 self.json_manager.update_dev_id(self.dev_info.dev_id);
1089 } else if id >= 0 {
1090 if let Err(_) = self.reload_json() {
1091 eprintln!("device reload json failed");
1092 }
1093 self.read_dev_info_async().await?;
1094 }
1095 Ok(())
1096 }
1097
1098 fn is_deleted(&self) -> bool {
1099 self.dev_flags.intersects(Self::UBLK_CTRL_DEV_DELETED)
1100 }
1101
1102 fn mark_deleted(&mut self) {
1103 self.dev_flags |= Self::UBLK_CTRL_DEV_DELETED;
1104 }
1105
1106 fn detect_features(&mut self) {
1108 self.features = match self.__get_features() {
1109 Ok(f) => Some(f),
1110 _ => None,
1111 };
1112 }
1113
1114 async fn detect_features_async(&mut self) {
1116 self.features = match self.__get_features_async().await {
1117 Ok(f) => Some(f),
1118 _ => None,
1119 };
1120 }
1121
1122 fn new(config: UblkCtrlConfig) -> Result<UblkCtrlInner, UblkError> {
1123 let dev_info = Self::create_device_info(
1124 config.id,
1125 config.nr_queues,
1126 config.depth,
1127 config.io_buf_bytes,
1128 config.flags,
1129 config.tgt_flags,
1130 );
1131 let file = Self::open_control_device()?;
1132 let (queue_tids, queue_selected_cpus) = Self::init_queue_data(config.nr_queues);
1133
1134 init_ctrl_task_ring_default(16)?;
1136
1137 let mut dev = UblkCtrlInner {
1138 name: config.name,
1139 file,
1140 dev_info,
1141 json_manager: UblkJsonManager::new(dev_info.dev_id),
1142 cmd_token: 0,
1143 queue_tids,
1144 queue_selected_cpus,
1145 nr_queues_configured: 0,
1146 dev_flags: config.dev_flags,
1147 force_async: false,
1148 force_sync: false,
1149 features: None,
1150 };
1151
1152 dev.detect_features();
1153 dev.handle_device_lifecycle(config.id)?;
1154
1155 log::info!(
1156 "ctrl: device {} flags {:x} created",
1157 dev.dev_info.dev_id,
1158 dev.dev_flags
1159 );
1160
1161 Ok(dev)
1162 }
1163
1164 async fn new_async(config: UblkCtrlConfig) -> Result<UblkCtrlInner, UblkError> {
1165 let dev_info = Self::create_device_info(
1166 config.id,
1167 config.nr_queues,
1168 config.depth,
1169 config.io_buf_bytes,
1170 config.flags,
1171 config.tgt_flags,
1172 );
1173 let file = Self::open_control_device()?;
1174 let (queue_tids, queue_selected_cpus) = Self::init_queue_data(config.nr_queues);
1175
1176 init_ctrl_task_ring_default(16)?;
1178
1179 let mut dev = UblkCtrlInner {
1180 name: config.name,
1181 file,
1182 dev_info,
1183 json_manager: UblkJsonManager::new(dev_info.dev_id),
1184 cmd_token: 0,
1185 queue_tids,
1186 queue_selected_cpus,
1187 nr_queues_configured: 0,
1188 dev_flags: config.dev_flags,
1189 force_async: false,
1190 force_sync: false,
1191 features: None,
1192 };
1193
1194 dev.detect_features_async().await;
1195 dev.handle_device_lifecycle_async(config.id).await?;
1196
1197 log::info!(
1198 "ctrl/async: device {} flags {:x} created",
1199 dev.dev_info.dev_id,
1200 dev.dev_flags
1201 );
1202
1203 Ok(dev)
1204 }
1205
1206 #[allow(clippy::too_many_arguments)]
1208 fn new_with_params(
1209 name: Option<String>,
1210 id: i32,
1211 nr_queues: u32,
1212 depth: u32,
1213 io_buf_bytes: u32,
1214 flags: u64,
1215 tgt_flags: u64,
1216 dev_flags: UblkFlags,
1217 ) -> Result<UblkCtrlInner, UblkError> {
1218 let config = UblkCtrlConfig::new(
1219 name,
1220 id,
1221 nr_queues,
1222 depth,
1223 io_buf_bytes,
1224 flags,
1225 tgt_flags,
1226 dev_flags,
1227 );
1228 Self::new(config)
1229 }
1230
1231 #[allow(clippy::too_many_arguments)]
1233 pub(crate) async fn new_with_params_async(
1234 name: Option<String>,
1235 id: i32,
1236 nr_queues: u32,
1237 depth: u32,
1238 io_buf_bytes: u32,
1239 flags: u64,
1240 tgt_flags: u64,
1241 dev_flags: UblkFlags,
1242 ) -> Result<UblkCtrlInner, UblkError> {
1243 let config = UblkCtrlConfig::new(
1244 name,
1245 id,
1246 nr_queues,
1247 depth,
1248 io_buf_bytes,
1249 flags,
1250 tgt_flags,
1251 dev_flags,
1252 );
1253 Self::new_async(config).await
1254 }
1255
1256 fn is_unprivileged(&self) -> bool {
1257 (self.dev_info.flags & (super::sys::UBLK_F_UNPRIVILEGED_DEV as u64)) != 0
1258 }
1259
1260 pub(crate) fn get_cdev_path(&self) -> String {
1261 format!("{}{}", UblkCtrlInner::CDEV_PATH, self.dev_info.dev_id)
1262 }
1263
1264 pub(crate) fn for_add_dev(&self) -> bool {
1265 self.dev_flags.intersects(UblkFlags::UBLK_DEV_F_ADD_DEV)
1266 }
1267
1268 fn for_recover_dev(&self) -> bool {
1269 self.dev_flags.intersects(UblkFlags::UBLK_DEV_F_RECOVER_DEV)
1270 }
1271
1272 fn dev_state_desc(&self) -> String {
1273 match self.dev_info.state as u32 {
1274 sys::UBLK_S_DEV_DEAD => "DEAD".to_string(),
1275 sys::UBLK_S_DEV_LIVE => "LIVE".to_string(),
1276 sys::UBLK_S_DEV_QUIESCED => "QUIESCED".to_string(),
1277 _ => "UNKNOWN".to_string(),
1278 }
1279 }
1280
1281 pub(crate) fn store_queue_tid(&mut self, qid: u16, tid: i32) {
1282 self.queue_tids[qid as usize] = tid;
1283 }
1284
1285 pub(crate) fn dump_from_json(&self) {
1286 self.json_manager.dump_json();
1287 }
1288
1289 pub(crate) fn run_path(&self) -> String {
1292 self.json_manager.get_json_path()
1293 }
1294
1295 fn ublk_ctrl_prep_cmd(
1296 &mut self,
1297 fd: i32,
1298 dev_id: u32,
1299 data: &UblkCtrlCmdData,
1300 token: u64,
1301 ) -> squeue::Entry128 {
1302 let cmd = sys::ublksrv_ctrl_cmd {
1303 addr: if (data.flags & CTRL_CMD_HAS_BUF) != 0 {
1304 data.addr
1305 } else {
1306 0
1307 },
1308 len: if (data.flags & CTRL_CMD_HAS_BUF) != 0 {
1309 data.len as u16
1310 } else {
1311 0
1312 },
1313 data: if (data.flags & CTRL_CMD_HAS_DATA) != 0 {
1314 [data.data]
1315 } else {
1316 [0]
1317 },
1318 dev_id,
1319 queue_id: u16::MAX,
1320 dev_path_len: data.dev_path_len,
1321 ..Default::default()
1322 };
1323 let c_cmd = CtrlCmd { ctrl_cmd: cmd };
1324
1325 opcode::UringCmd80::new(types::Fd(fd), data.cmd_op)
1326 .cmd(unsafe { c_cmd.buf })
1327 .build()
1328 .user_data(token)
1329 }
1330
1331 fn ublk_submit_cmd_async(&mut self, data: &UblkCtrlCmdData) -> UblkUringOpFuture {
1332 let fd = self.file.as_raw_fd();
1333 let dev_id = self.dev_info.dev_id;
1334 let f = UblkUringOpFuture::new(0);
1335 let sqe = self.ublk_ctrl_prep_cmd(fd, dev_id, data, f.user_data);
1336
1337 unsafe {
1338 with_ctrl_ring_mut_internal!(|ring: &mut IoUring<squeue::Entry128>| {
1339 if let Err(e) = ring.submission().push(&sqe) {
1340 eprintln!("Warning: Failed to push SQE to submission queue: {:?}", e);
1341 }
1342 })
1343 }
1344 f
1345 }
1346
1347 fn ublk_submit_cmd(
1348 &mut self,
1349 data: &UblkCtrlCmdData,
1350 to_wait: usize,
1351 ) -> Result<u64, UblkError> {
1352 let fd = self.file.as_raw_fd();
1353 let dev_id = self.dev_info.dev_id;
1354
1355 let token = {
1358 self.cmd_token += 1;
1359 self.cmd_token
1360 } as u64;
1361 let sqe = self.ublk_ctrl_prep_cmd(fd, dev_id, data, token);
1362
1363 with_ctrl_ring_mut_internal!(|r: &mut IoUring<squeue::Entry128>| {
1364 unsafe {
1365 if let Err(e) = r.submission().push(&sqe) {
1366 eprintln!("Warning: Failed to push SQE to submission queue: {:?}", e);
1367 return;
1368 }
1369 };
1370 let _ = r.submit_and_wait(to_wait);
1371 });
1372 Ok(token)
1373 }
1374
1375 fn poll_cmd(&mut self, token: u64) -> i32 {
1378 with_ctrl_ring_mut_internal!(|r: &mut IoUring<squeue::Entry128>| {
1379 let res = match r.completion().next() {
1380 Some(cqe) => {
1381 if cqe.user_data() != token {
1382 -libc::EAGAIN
1383 } else {
1384 cqe.result()
1385 }
1386 }
1387 None => -libc::EAGAIN,
1388 };
1389
1390 res
1391 })
1392 }
1393
1394 fn ublk_ctrl_need_retry(
1395 new_data: &mut UblkCtrlCmdData,
1396 data: &UblkCtrlCmdData,
1397 res: i32,
1398 ) -> bool {
1399 let legacy_op = data.cmd_op & 0xff;
1400
1401 if res >= 0 || res == -libc::EBUSY || (legacy_op > sys::UBLK_CMD_GET_DEV_INFO2) {
1408 false
1409 } else {
1410 *new_data = *data;
1411 new_data.cmd_op = legacy_op;
1412 true
1413 }
1414 }
1415
1416 fn ublk_err_to_result(res: i32) -> Result<i32, UblkError> {
1418 if res >= 0 || res == -libc::EBUSY {
1419 Ok(res)
1420 } else {
1421 Err(UblkError::UringIOError(res))
1422 }
1423 }
1424
1425 async fn ublk_ctrl_cmd_async(&mut self, data: &UblkCtrlCmdData) -> Result<i32, UblkError> {
1426 if !self.force_async
1428 && !self
1429 .dev_flags
1430 .contains(UblkCtrlInner::UBLK_CTRL_ASYNC_AWAIT)
1431 {
1432 log::warn!("Warn: async cmd {:x} is run from sync context", data.cmd_op);
1433 return Err(UblkError::OtherError(-libc::EPERM));
1434 }
1435
1436 let mut new_data = *data;
1437 let mut res: i32 = 0;
1438
1439 for _ in 0..2 {
1440 let (old_buf, _new) = new_data.prep_un_privileged_dev_path(self);
1441 res = self.ublk_submit_cmd_async(&new_data).await;
1442 new_data.unprep_un_privileged_dev_path(self, old_buf);
1443
1444 trace!("ublk_ctrl_cmd_async: cmd {:x} res {}", data.cmd_op, res);
1445 if !Self::ublk_ctrl_need_retry(&mut new_data, data, res) {
1446 break;
1447 }
1448 }
1449
1450 Self::ublk_err_to_result(res)
1451 }
1452
1453 fn ublk_ctrl_cmd(&mut self, data: &UblkCtrlCmdData) -> Result<i32, UblkError> {
1454 if !self.force_sync
1456 && self
1457 .dev_flags
1458 .contains(UblkCtrlInner::UBLK_CTRL_ASYNC_AWAIT)
1459 {
1460 log::warn!("Warn: sync cmd {:x} is run from async context", data.cmd_op);
1461 return Err(UblkError::OtherError(-libc::EPERM));
1462 }
1463
1464 let mut new_data = *data;
1465 let mut res: i32 = 0;
1466
1467 for _ in 0..2 {
1468 let (old_buf, _new) = new_data.prep_un_privileged_dev_path(self);
1469 let token = self.ublk_submit_cmd(&new_data, 1)?;
1470 res = self.poll_cmd(token);
1471 new_data.unprep_un_privileged_dev_path(self, old_buf);
1472
1473 trace!("ublk_ctrl_cmd: cmd {:x} res {}", data.cmd_op, res);
1474 if !Self::ublk_ctrl_need_retry(&mut new_data, data, res) {
1475 break;
1476 }
1477 }
1478
1479 Self::ublk_err_to_result(res)
1480 }
1481
1482 fn prepare_add_cmd(&self) -> UblkCtrlCmdData {
1484 UblkCtrlCmdData::new_write_buffer_cmd(
1485 sys::UBLK_U_CMD_ADD_DEV,
1486 std::ptr::addr_of!(self.dev_info) as u64,
1487 core::mem::size_of::<sys::ublksrv_ctrl_dev_info>() as u32,
1488 true, )
1490 }
1491
1492 fn add(&mut self) -> Result<i32, UblkError> {
1493 let data = self.prepare_add_cmd();
1494 self.ublk_ctrl_cmd(&data)
1495 }
1496
1497 async fn add_async(&mut self) -> Result<i32, UblkError> {
1500 let data = self.prepare_add_cmd();
1501 self.ublk_ctrl_cmd_async(&data).await
1502 }
1503
1504 fn prepare_del_cmd(&self, force_async: bool) -> UblkCtrlCmdData {
1506 let cmd_op = if force_async
1507 || self
1508 .dev_flags
1509 .intersects(UblkFlags::UBLK_DEV_F_DEL_DEV_ASYNC)
1510 {
1511 sys::UBLK_U_CMD_DEL_DEV_ASYNC
1512 } else {
1513 sys::UBLK_U_CMD_DEL_DEV
1514 };
1515 UblkCtrlCmdData::new_simple_cmd(cmd_op)
1516 }
1517
1518 fn del(&mut self) -> Result<i32, UblkError> {
1521 if self.is_deleted() {
1522 return Ok(0);
1523 }
1524
1525 let data = self.prepare_del_cmd(false);
1526 let res = self.ublk_ctrl_cmd(&data)?;
1527 self.mark_deleted();
1528 Ok(res)
1529 }
1530
1531 fn del_async(&mut self) -> Result<i32, UblkError> {
1534 if self.is_deleted() {
1535 return Ok(0);
1536 }
1537
1538 let data = self.prepare_del_cmd(true);
1539 let res = self.ublk_ctrl_cmd(&data)?;
1540 self.mark_deleted();
1541 Ok(res)
1542 }
1543
1544 pub(crate) async fn del_async_await(&mut self) -> Result<i32, UblkError> {
1552 if self.is_deleted() {
1553 return Ok(0);
1554 }
1555
1556 let data = self.prepare_del_cmd(true);
1557 let res = self.ublk_ctrl_cmd_async(&data).await?;
1558 self.mark_deleted();
1559 Ok(res)
1560 }
1561
1562 fn prepare_get_features_cmd(features_ptr: u64) -> UblkCtrlCmdData {
1564 UblkCtrlCmdData::new_read_buffer_cmd(
1565 sys::UBLK_U_CMD_GET_FEATURES,
1566 features_ptr,
1567 core::mem::size_of::<u64>() as u32,
1568 true, )
1570 }
1571
1572 fn __get_features(&mut self) -> Result<u64, UblkError> {
1573 let features = 0_u64;
1574 let data = Self::prepare_get_features_cmd(std::ptr::addr_of!(features) as u64);
1575 self.ublk_ctrl_cmd(&data)?;
1576 Ok(features)
1577 }
1578
1579 async fn __get_features_async(&mut self) -> Result<u64, UblkError> {
1580 let features = 0_u64;
1581 let data = Self::prepare_get_features_cmd(std::ptr::addr_of!(features) as u64);
1582 self.ublk_ctrl_cmd_async(&data).await?;
1583 Ok(features)
1584 }
1585
1586 fn prepare_read_dev_info_cmd(&self, cmd_op: u32) -> UblkCtrlCmdData {
1588 UblkCtrlCmdData::new_read_buffer_cmd(
1589 cmd_op,
1590 std::ptr::addr_of!(self.dev_info) as u64,
1591 core::mem::size_of::<sys::ublksrv_ctrl_dev_info>() as u32,
1592 false, )
1594 }
1595
1596 fn __read_dev_info(&mut self) -> Result<i32, UblkError> {
1597 let data = self.prepare_read_dev_info_cmd(sys::UBLK_U_CMD_GET_DEV_INFO);
1598 self.ublk_ctrl_cmd(&data)
1599 }
1600
1601 fn __read_dev_info2(&mut self) -> Result<i32, UblkError> {
1602 let data = self.prepare_read_dev_info_cmd(sys::UBLK_U_CMD_GET_DEV_INFO2);
1603 self.ublk_ctrl_cmd(&data)
1604 }
1605
1606 fn read_dev_info(&mut self) -> Result<i32, UblkError> {
1607 self.__read_dev_info2().or_else(|_| self.__read_dev_info())
1608 }
1609
1610 pub(crate) async fn read_dev_info_async(&mut self) -> Result<i32, UblkError> {
1613 match self.__read_dev_info2_async().await {
1614 Ok(result) => Ok(result),
1615 Err(_) => self.__read_dev_info_async().await,
1616 }
1617 }
1618
1619 async fn __read_dev_info_async(&mut self) -> Result<i32, UblkError> {
1620 let data = self.prepare_read_dev_info_cmd(sys::UBLK_U_CMD_GET_DEV_INFO);
1621 self.ublk_ctrl_cmd_async(&data).await
1622 }
1623
1624 async fn __read_dev_info2_async(&mut self) -> Result<i32, UblkError> {
1625 let data = self.prepare_read_dev_info_cmd(sys::UBLK_U_CMD_GET_DEV_INFO2);
1626 self.ublk_ctrl_cmd_async(&data).await
1627 }
1628
1629 fn prepare_start_cmd(pid: i32) -> UblkCtrlCmdData {
1631 UblkCtrlCmdData::new_data_cmd(sys::UBLK_U_CMD_START_DEV, pid as u64)
1632 }
1633
1634 fn start(&mut self, pid: i32) -> Result<i32, UblkError> {
1637 let data = Self::prepare_start_cmd(pid);
1638 self.ublk_ctrl_cmd(&data)
1639 }
1640
1641 async fn start_async(&mut self, pid: i32) -> Result<i32, UblkError> {
1644 let data = Self::prepare_start_cmd(pid);
1645 self.ublk_ctrl_cmd_async(&data).await
1646 }
1647
1648 fn prepare_stop_cmd() -> UblkCtrlCmdData {
1650 UblkCtrlCmdData::new_simple_cmd(sys::UBLK_U_CMD_STOP_DEV)
1651 }
1652
1653 fn stop(&mut self) -> Result<i32, UblkError> {
1656 let data = Self::prepare_stop_cmd();
1657 self.ublk_ctrl_cmd(&data)
1658 }
1659
1660 pub(crate) async fn stop_async(&mut self) -> Result<i32, UblkError> {
1663 let data = Self::prepare_stop_cmd();
1664 self.ublk_ctrl_cmd_async(&data).await
1665 }
1666
1667 fn prepare_get_params_cmd(params: &mut sys::ublk_params) -> UblkCtrlCmdData {
1669 params.len = core::mem::size_of::<sys::ublk_params>() as u32;
1670 UblkCtrlCmdData::new_read_buffer_cmd(
1671 sys::UBLK_U_CMD_GET_PARAMS,
1672 params as *const sys::ublk_params as u64,
1673 params.len,
1674 false, )
1676 }
1677
1678 fn get_params(&mut self, params: &mut sys::ublk_params) -> Result<i32, UblkError> {
1683 let data = Self::prepare_get_params_cmd(params);
1684 self.ublk_ctrl_cmd(&data)
1685 }
1686
1687 pub(crate) async fn get_params_async(
1690 &mut self,
1691 params: &mut sys::ublk_params,
1692 ) -> Result<i32, UblkError> {
1693 let data = Self::prepare_get_params_cmd(params);
1694 self.ublk_ctrl_cmd_async(&data).await
1695 }
1696
1697 fn set_params(&mut self, params: &sys::ublk_params) -> Result<i32, UblkError> {
1702 let mut p = *params;
1703
1704 p.len = core::mem::size_of::<sys::ublk_params>() as u32;
1705 let data = UblkCtrlCmdData::new_write_buffer_cmd(
1706 sys::UBLK_U_CMD_SET_PARAMS,
1707 std::ptr::addr_of!(p) as u64,
1708 p.len,
1709 false, );
1711
1712 self.ublk_ctrl_cmd(&data)
1713 }
1714
1715 pub(crate) async fn set_params_async(
1720 &mut self,
1721 params: &sys::ublk_params,
1722 ) -> Result<i32, UblkError> {
1723 let mut p = *params;
1724
1725 p.len = core::mem::size_of::<sys::ublk_params>() as u32;
1726 let data = UblkCtrlCmdData::new_write_buffer_cmd(
1727 sys::UBLK_U_CMD_SET_PARAMS,
1728 std::ptr::addr_of!(p) as u64,
1729 p.len,
1730 false, );
1732 self.ublk_ctrl_cmd_async(&data).await
1733 }
1734
1735 fn prepare_get_queue_affinity_cmd(q: u32, bm: &mut UblkQueueAffinity) -> UblkCtrlCmdData {
1737 UblkCtrlCmdData::new_data_buffer_cmd(
1738 sys::UBLK_U_CMD_GET_QUEUE_AFFINITY,
1739 q as u64,
1740 bm.addr() as u64,
1741 bm.buf_len() as u32,
1742 true, )
1744 }
1745
1746 fn get_queue_affinity(&mut self, q: u32, bm: &mut UblkQueueAffinity) -> Result<i32, UblkError> {
1747 let data = Self::prepare_get_queue_affinity_cmd(q, bm);
1748 self.ublk_ctrl_cmd(&data)
1749 }
1750
1751 pub(crate) async fn get_queue_affinity_async(
1754 &mut self,
1755 q: u32,
1756 bm: &mut UblkQueueAffinity,
1757 ) -> Result<i32, UblkError> {
1758 let data = Self::prepare_get_queue_affinity_cmd(q, bm);
1759 self.ublk_ctrl_cmd_async(&data).await
1760 }
1761
1762 fn prepare_start_user_recover_cmd() -> UblkCtrlCmdData {
1764 UblkCtrlCmdData::new_simple_cmd(sys::UBLK_U_CMD_START_USER_RECOVERY)
1765 }
1766
1767 fn __start_user_recover(&mut self) -> Result<i32, UblkError> {
1768 let data = Self::prepare_start_user_recover_cmd();
1769 self.ublk_ctrl_cmd(&data)
1770 }
1771
1772 pub(crate) async fn __start_user_recover_async(&mut self) -> Result<i32, UblkError> {
1773 let data = Self::prepare_start_user_recover_cmd();
1774 self.ublk_ctrl_cmd_async(&data).await
1775 }
1776
1777 fn prepare_end_user_recover_cmd(pid: i32) -> UblkCtrlCmdData {
1779 UblkCtrlCmdData::new_data_cmd(sys::UBLK_U_CMD_END_USER_RECOVERY, pid as u64)
1780 }
1781
1782 fn end_user_recover(&mut self, pid: i32) -> Result<i32, UblkError> {
1785 let data = Self::prepare_end_user_recover_cmd(pid);
1786 self.ublk_ctrl_cmd(&data)
1787 }
1788
1789 async fn end_user_recover_async(&mut self, pid: i32) -> Result<i32, UblkError> {
1792 let data = Self::prepare_end_user_recover_cmd(pid);
1793 self.ublk_ctrl_cmd_async(&data).await
1794 }
1795
1796 fn prep_start_dev(&mut self, dev: &UblkDev) -> Result<i32, UblkError> {
1797 self.read_dev_info()?;
1798 if self.dev_info.state == sys::UBLK_S_DEV_LIVE as u16 {
1799 return Ok(0);
1800 }
1801
1802 if self.dev_info.state != sys::UBLK_S_DEV_QUIESCED as u16 {
1803 self.set_params(&dev.tgt.params)?;
1804 self.flush_json()?;
1805 } else if self.for_recover_dev() {
1806 self.flush_json()?;
1807 } else {
1808 return Err(UblkError::OtherError(-libc::EINVAL));
1809 };
1810
1811 Ok(0)
1812 }
1813
1814 async fn prep_start_dev_async(&mut self, dev: &UblkDev) -> Result<i32, UblkError> {
1816 self.read_dev_info_async().await?;
1817 if self.dev_info.state == sys::UBLK_S_DEV_LIVE as u16 {
1818 return Ok(0);
1819 }
1820
1821 if self.dev_info.state != sys::UBLK_S_DEV_QUIESCED as u16 {
1822 self.set_params_async(&dev.tgt.params).await?;
1823 self.flush_json()?;
1824 } else if self.for_recover_dev() {
1825 self.flush_json()?;
1826 } else {
1827 return Err(UblkError::OtherError(-libc::EINVAL));
1828 };
1829
1830 Ok(0)
1831 }
1832
1833 pub(crate) async fn start_dev_async(&mut self, dev: &UblkDev) -> Result<i32, UblkError> {
1834 self.prep_start_dev_async(dev).await?;
1835
1836 if self.dev_info.state != sys::UBLK_S_DEV_QUIESCED as u16 {
1837 self.start_async(unsafe { libc::getpid() as i32 }).await
1838 } else if self.for_recover_dev() {
1839 self.end_user_recover_async(unsafe { libc::getpid() as i32 })
1840 .await
1841 } else {
1842 Err(crate::UblkError::OtherError(-libc::EINVAL))
1843 }
1844 }
1845
1846 fn flush_json(&mut self) -> Result<i32, UblkError> {
1848 if !self.for_add_dev() && !self.for_recover_dev() {
1851 return Ok(0);
1852 }
1853
1854 self.json_manager.flush_json()
1855 }
1856
1857 fn build_json_with_affinities(
1868 &self,
1869 dev: &UblkDev,
1870 queue_affinities: &[UblkQueueAffinity],
1871 ) -> serde_json::Value {
1872 let tgt_data = dev.get_target_json();
1873 let mut map: serde_json::Map<String, serde_json::Value> = serde_json::Map::new();
1874
1875 for qid in 0..dev.dev_info.nr_hw_queues {
1876 let affinity = &queue_affinities[qid as usize];
1877
1878 map.insert(
1879 format!("{}", qid),
1880 serde_json::json!({
1881 "qid": qid,
1882 "tid": self.queue_tids[qid as usize],
1883 "affinity": affinity.to_bits_vec(),
1884 }),
1885 );
1886 }
1887
1888 let mut json = serde_json::json!({
1889 "dev_info": dev.dev_info,
1890 "target": dev.tgt,
1891 "target_flags": dev.flags.bits(),
1892 });
1893
1894 if let Some(val) = tgt_data {
1895 json["target_data"] = val.clone()
1896 }
1897
1898 json["queues"] = serde_json::Value::Object(map);
1899 json
1900 }
1901
1902 fn build_json_internal(&mut self, dev: &UblkDev) -> Result<serde_json::Value, UblkError> {
1904 let mut queue_affinities = Vec::with_capacity(dev.dev_info.nr_hw_queues as usize);
1905
1906 for qid in 0..dev.dev_info.nr_hw_queues {
1907 let affinity = self.get_queue_affinity_effective(qid)?;
1908 queue_affinities.push(affinity);
1909 }
1910
1911 Ok(self.build_json_with_affinities(dev, &queue_affinities))
1912 }
1913
1914 async fn build_json_internal_async(
1916 &mut self,
1917 dev: &UblkDev,
1918 ) -> Result<serde_json::Value, UblkError> {
1919 let mut queue_affinities = Vec::with_capacity(dev.dev_info.nr_hw_queues as usize);
1920
1921 for qid in 0..dev.dev_info.nr_hw_queues {
1922 let affinity = self.get_queue_affinity_effective_async(qid).await?;
1923 queue_affinities.push(affinity);
1924 }
1925
1926 Ok(self.build_json_with_affinities(dev, &queue_affinities))
1927 }
1928
1929 fn update_json_queue_tids(&mut self, dev: &UblkDev) -> bool {
1931 if !self.json_manager.get_json().is_null()
1932 && self.json_manager.get_json().is_object()
1933 && !self.json_manager.get_json().as_object().unwrap().is_empty()
1934 {
1935 if let Some(queues) = self.json_manager.get_json_mut().get_mut("queues") {
1936 for qid in 0..dev.dev_info.nr_hw_queues {
1937 if let Some(queue) = queues.get_mut(&qid.to_string()) {
1938 if let Some(tid) = queue.get_mut("tid") {
1939 *tid = serde_json::json!(self.queue_tids[qid as usize]);
1940 }
1941 }
1942 }
1943 }
1944 true
1945 } else {
1946 false
1947 }
1948 }
1949
1950 fn build_json(&mut self, dev: &UblkDev) -> Result<i32, UblkError> {
1951 if self.update_json_queue_tids(dev) {
1953 return Ok(0);
1954 }
1955
1956 let json = self.build_json_internal(dev)?;
1957 self.json_manager.set_json(json);
1958 Ok(0)
1959 }
1960
1961 pub(crate) async fn build_json_async(&mut self, dev: &UblkDev) -> Result<i32, UblkError> {
1962 if self.update_json_queue_tids(dev) {
1964 return Ok(0);
1965 }
1966
1967 let json = self.build_json_internal_async(dev).await?;
1968 self.json_manager.set_json(json);
1969 Ok(0)
1970 }
1971
1972 fn reload_json(&mut self) -> Result<i32, UblkError> {
1975 self.json_manager.reload_json()
1976 }
1977
1978 pub(crate) fn dump_device_info(&self, p: &sys::ublk_params) {
1984 let info = &self.dev_info;
1985 println!(
1986 "\ndev id {}: nr_hw_queues {} queue_depth {} block size {} dev_capacity {}",
1987 info.dev_id,
1988 info.nr_hw_queues,
1989 info.queue_depth,
1990 1 << p.basic.logical_bs_shift,
1991 p.basic.dev_sectors
1992 );
1993 println!(
1994 "\tmax rq size {} daemon pid {} flags 0x{:x} state {}",
1995 info.max_io_buf_bytes,
1996 info.ublksrv_pid,
1997 info.flags,
1998 self.dev_state_desc()
1999 );
2000 println!(
2001 "\tublkc: {}:{} ublkb: {}:{} owner: {}:{}",
2002 p.devt.char_major,
2003 p.devt.char_minor,
2004 p.devt.disk_major,
2005 p.devt.disk_minor,
2006 info.owner_uid,
2007 info.owner_gid
2008 );
2009 }
2010
2011 fn validate_param(condition: bool) -> Result<(), UblkError> {
2013 if condition {
2014 Ok(())
2015 } else {
2016 Err(UblkError::InvalidVal)
2017 }
2018 }
2019
2020 #[allow(clippy::too_many_arguments)]
2037 pub(crate) fn validate_new_params(
2039 flags: u64,
2040 dev_flags: UblkFlags,
2041 id: i32,
2042 nr_queues: u32,
2043 depth: u32,
2044 io_buf_bytes: u32,
2045 ) -> Result<(), UblkError> {
2046 Self::validate_param((flags & !Self::UBLK_DRV_F_ALL) == 0)?;
2047
2048 Self::validate_param(!dev_flags.intersects(crate::ublk_internal_flags_all!()))?;
2049
2050 if !Path::new(CTRL_PATH).exists() {
2051 eprintln!("Please run `modprobe ublk_drv` first");
2052 return Err(UblkError::OtherError(-libc::ENOENT));
2053 }
2054
2055 if dev_flags.intersects(UblkFlags::UBLK_DEV_F_MLOCK_IO_BUFFER) {
2057 Self::validate_param(
2059 (flags & sys::UBLK_F_USER_COPY as u64) == 0
2060 && (flags & sys::UBLK_F_AUTO_BUF_REG as u64) == 0
2061 && (flags & sys::UBLK_F_SUPPORT_ZERO_COPY as u64) == 0,
2062 )?;
2063 }
2064
2065 Self::validate_param(id >= -1)?;
2066 Self::validate_param(nr_queues <= sys::UBLK_MAX_NR_QUEUES)?;
2067 Self::validate_param(depth <= sys::UBLK_MAX_QUEUE_DEPTH)?;
2068
2069 let page_sz = unsafe { libc::sysconf(libc::_SC_PAGESIZE) } as u32;
2070 Self::validate_param(io_buf_bytes <= MAX_BUF_SZ && (io_buf_bytes & (page_sz - 1)) == 0)?;
2071
2072 Ok(())
2073 }
2074
2075 fn sys_result_to_error(res: i32) -> Result<i32, UblkError> {
2079 if res >= 0 {
2080 Ok(res)
2081 } else {
2082 Err(UblkError::OtherError(res))
2083 }
2084 }
2085
2086 fn validate_queue_id(qid: u16, max_queues: u16) -> Result<(), UblkError> {
2088 if (qid as usize) < (max_queues as usize) {
2089 Ok(())
2090 } else {
2091 Err(UblkError::OtherError(-libc::EINVAL))
2092 }
2093 }
2094
2095 fn validate_thread_id(tid: i32) -> Result<(), UblkError> {
2097 if tid != 0 {
2098 Ok(())
2099 } else {
2100 Err(UblkError::OtherError(-libc::ESRCH))
2101 }
2102 }
2103}
2104
2105impl UblkCtrl {
2106 fn get_inner(&self) -> std::sync::RwLockReadGuard<'_, UblkCtrlInner> {
2107 self.inner.read().unwrap_or_else(|poisoned| {
2108 eprintln!("Warning: RwLock poisoned, recovering");
2109 poisoned.into_inner()
2110 })
2111 }
2112
2113 fn get_inner_mut(&self) -> std::sync::RwLockWriteGuard<'_, UblkCtrlInner> {
2114 self.inner.write().unwrap_or_else(|poisoned| {
2115 eprintln!("Warning: RwLock poisoned, recovering");
2116 poisoned.into_inner()
2117 })
2118 }
2119
2120 pub fn get_name(&self) -> String {
2121 let inner = self.get_inner();
2122
2123 match &inner.name {
2124 Some(name) => name.clone(),
2125 None => "none".to_string(),
2126 }
2127 }
2128
2129 pub(crate) fn get_dev_flags(&self) -> UblkFlags {
2130 self.get_inner().dev_flags
2131 }
2132
2133 pub fn new(
2134 name: Option<String>,
2135 id: i32,
2136 nr_queues: u32,
2137 depth: u32,
2138 io_buf_bytes: u32,
2139 flags: u64,
2140 tgt_flags: u64,
2141 dev_flags: UblkFlags,
2142 ) -> Result<UblkCtrl, UblkError> {
2143 if dev_flags.intersects(UblkCtrlInner::UBLK_CTRL_ASYNC_AWAIT) {
2144 return Err(UblkError::InvalidVal);
2145 }
2146
2147 UblkCtrlInner::validate_new_params(flags, dev_flags, id, nr_queues, depth, io_buf_bytes)?;
2148
2149 let inner = RwLock::new(UblkCtrlInner::new_with_params(
2150 name,
2151 id,
2152 nr_queues,
2153 depth,
2154 io_buf_bytes,
2155 flags,
2156 tgt_flags,
2157 dev_flags,
2158 )?);
2159
2160 Ok(UblkCtrl { inner })
2161 }
2162
2163 pub fn new_simple(id: i32) -> Result<UblkCtrl, UblkError> {
2166 assert!(id >= 0);
2167 Self::new(None, id, 0, 0, 0, 0, 0, UblkFlags::empty())
2168 }
2169
2170 pub fn dev_info(&self) -> sys::ublksrv_ctrl_dev_info {
2172 self.get_inner().dev_info
2173 }
2174
2175 pub fn get_driver_features(&self) -> Option<u64> {
2180 self.get_inner().features
2181 }
2182
2183 pub fn get_cdev_path(&self) -> String {
2185 self.get_inner().get_cdev_path()
2186 }
2187
2188 pub fn get_bdev_path(&self) -> String {
2190 format!(
2191 "{}{}",
2192 UblkCtrlInner::BDEV_PATH,
2193 self.get_inner().dev_info.dev_id
2194 )
2195 }
2196
2197 pub fn get_queue_tid(&self, qid: u32) -> Result<i32, UblkError> {
2204 let ctrl = self.get_inner();
2205 ctrl.json_manager.get_queue_tid_from_json(qid as u16)
2206 }
2207
2208 pub fn get_target_flags_from_json(&self) -> Result<u32, UblkError> {
2211 let ctrl = self.get_inner();
2212 ctrl.json_manager.get_target_flags_from_json()
2213 }
2214
2215 pub fn get_target_from_json(&self) -> Result<super::io::UblkTgt, UblkError> {
2218 let ctrl = self.get_inner();
2219 ctrl.json_manager.get_target_from_json()
2220 }
2221
2222 pub fn get_target_data_from_json(&self) -> Option<serde_json::Value> {
2227 let ctrl = self.get_inner();
2228 ctrl.json_manager.get_target_data_from_json()
2229 }
2230
2231 pub fn get_target_type_from_json(&self) -> Result<String, UblkError> {
2234 let ctrl = self.get_inner();
2235 ctrl.json_manager.get_target_type_from_json()
2236 }
2237
2238 pub fn configure_queue(&self, dev: &UblkDev, qid: u16, tid: i32) -> Result<i32, UblkError> {
2248 let mut ctrl = self.get_inner_mut();
2249
2250 ctrl.store_queue_tid(qid, tid);
2251
2252 ctrl.nr_queues_configured += 1;
2253
2254 if ctrl.nr_queues_configured == ctrl.dev_info.nr_hw_queues {
2255 ctrl.build_json(dev)?;
2256 }
2257
2258 Ok(0)
2259 }
2260
2261 pub fn dump(&self) {
2262 let mut ctrl = self.get_inner_mut();
2263 let mut p = sys::ublk_params {
2264 ..Default::default()
2265 };
2266
2267 if ctrl.read_dev_info().is_err() {
2268 error!("Dump dev {} failed\n", ctrl.dev_info.dev_id);
2269 return;
2270 }
2271
2272 if ctrl.get_params(&mut p).is_err() {
2273 error!("Dump dev {} failed\n", ctrl.dev_info.dev_id);
2274 return;
2275 }
2276
2277 ctrl.dump_device_info(&p);
2278 ctrl.dump_from_json();
2279 println!("\tublksrv_flags: 0x{:x}", ctrl.dev_info.ublksrv_flags);
2280 }
2281
2282 pub fn run_dir() -> String {
2283 String::from("/run/ublksrvd")
2284 }
2285
2286 pub fn run_path(&self) -> String {
2289 self.get_inner().run_path()
2290 }
2291
2292 pub fn get_features() -> Option<u64> {
2296 match Self::new(None, -1, 0, 0, 0, 0, 0, UblkFlags::empty()) {
2297 Ok(ctrl) => ctrl.get_driver_features(),
2298 _ => None,
2299 }
2300 }
2301
2302 pub fn read_dev_info(&self) -> Result<i32, UblkError> {
2305 self.get_inner_mut().read_dev_info()
2306 }
2307
2308 pub fn get_params(&self, params: &mut sys::ublk_params) -> Result<i32, UblkError> {
2313 self.get_inner_mut().get_params(params)
2314 }
2315
2316 pub fn set_params(&self, params: &sys::ublk_params) -> Result<i32, UblkError> {
2321 self.get_inner_mut().set_params(params)
2322 }
2323
2324 pub fn get_queue_affinity(&self, q: u32, bm: &mut UblkQueueAffinity) -> Result<i32, UblkError> {
2327 self.get_inner_mut().get_queue_affinity(q, bm)
2328 }
2329
2330 pub fn set_queue_single_affinity(
2343 &self,
2344 qid: u16,
2345 cpu: Option<usize>,
2346 ) -> Result<usize, UblkError> {
2347 self.get_inner_mut().select_single_cpu_for_queue(qid, cpu)
2348 }
2349
2350 pub fn get_queue_effective_affinity(
2362 &self,
2363 qid: u16,
2364 affinity: &mut UblkQueueAffinity,
2365 ) -> Result<i32, UblkError> {
2366 let inner = self.get_inner();
2367
2368 UblkCtrlInner::validate_queue_id(qid, inner.queue_tids.len() as u16)?;
2370
2371 let tid = inner.queue_tids[qid as usize];
2372
2373 UblkCtrlInner::validate_thread_id(tid)?;
2375
2376 let result = unsafe {
2378 libc::sched_getaffinity(
2379 tid,
2380 affinity.buf_len(),
2381 affinity.addr_mut() as *mut libc::cpu_set_t,
2382 )
2383 };
2384
2385 if result == 0 {
2386 Ok(0)
2387 } else {
2388 UblkCtrlInner::sys_result_to_error(-unsafe { *libc::__errno_location() })
2389 }
2390 }
2391
2392 pub fn start_user_recover(&self) -> Result<i32, UblkError> {
2395 let mut count = 0u32;
2396 let unit = 100_u32;
2397
2398 loop {
2399 let res = self.get_inner_mut().__start_user_recover();
2400 if let Ok(r) = res {
2401 if r == -libc::EBUSY {
2402 std::thread::sleep(std::time::Duration::from_millis(unit as u64));
2403 count += unit;
2404 if count < 30000 {
2405 continue;
2406 }
2407 }
2408 }
2409 return res;
2410 }
2411 }
2412
2413 pub fn start_dev(&self, dev: &UblkDev) -> Result<i32, UblkError> {
2426 let mut ctrl = self.get_inner_mut();
2427 ctrl.prep_start_dev(dev)?;
2428
2429 dev.wait_for_buffer_registration(ctrl.dev_info.nr_hw_queues as usize)?;
2431
2432 if ctrl.dev_info.state != sys::UBLK_S_DEV_QUIESCED as u16 {
2433 ctrl.start(unsafe { libc::getpid() as i32 })
2434 } else if ctrl.for_recover_dev() {
2435 ctrl.end_user_recover(unsafe { libc::getpid() as i32 })
2436 } else {
2437 Err(crate::UblkError::OtherError(-libc::EINVAL))
2438 }
2439 }
2440
2441 pub async fn start_dev_async(&self, dev: &UblkDev) -> Result<i32, UblkError> {
2456 let mut ctrl = self.get_inner_mut();
2457
2458 ctrl.force_async = true;
2459
2460 dev.wait_for_buffer_registration(ctrl.dev_info.nr_hw_queues as usize)?;
2462
2463 let res = ctrl.start_dev_async(dev).await;
2464 ctrl.force_async = false;
2465 res
2466 }
2467
2468 pub fn stop_dev(&self) -> Result<i32, UblkError> {
2473 let mut ctrl = self.get_inner_mut();
2474 let rp = ctrl.run_path();
2475
2476 if ctrl.for_add_dev() && Path::new(&rp).exists() {
2477 fs::remove_file(rp)?;
2478 }
2479 ctrl.stop()
2480 }
2481
2482 pub fn kill_dev(&self) -> Result<i32, UblkError> {
2491 self.get_inner_mut().stop()
2492 }
2493
2494 pub fn del_dev(&self) -> Result<i32, UblkError> {
2503 let mut ctrl = self.get_inner_mut();
2504
2505 ctrl.del()?;
2506 if Path::new(&ctrl.run_path()).exists() {
2507 fs::remove_file(ctrl.run_path())?;
2508 }
2509 Ok(0)
2510 }
2511
2512 pub fn del_dev_async(&self) -> Result<i32, UblkError> {
2515 let mut ctrl = self.get_inner_mut();
2516
2517 ctrl.del_async()?;
2518 if Path::new(&ctrl.run_path()).exists() {
2519 fs::remove_file(ctrl.run_path())?;
2520 }
2521 Ok(0)
2522 }
2523
2524 fn calculate_queue_affinity(&self, queue_id: u16) -> UblkQueueAffinity {
2529 self.get_inner_mut()
2530 .create_thread_affinity(queue_id)
2531 .unwrap_or_else(|_| {
2532 let mut affinity = UblkQueueAffinity::new();
2534 self.get_queue_affinity(queue_id as u32, &mut affinity)
2535 .unwrap_or_default();
2536 affinity
2537 })
2538 }
2539
2540 pub fn set_thread_affinity(&self, qid: u16, tid: libc::pid_t) {
2546 let affinity = self.calculate_queue_affinity(qid);
2548
2549 unsafe {
2550 libc::sched_setaffinity(
2551 tid,
2552 affinity.buf_len(),
2553 affinity.addr() as *const libc::cpu_set_t,
2554 );
2555 }
2556 }
2557
2558 pub fn init_queue_thread() -> libc::pid_t {
2563 let tid = unsafe { libc::gettid() };
2564
2565 unsafe {
2567 const PR_SET_IO_FLUSHER: i32 = 57; libc::prctl(PR_SET_IO_FLUSHER, 0, 0, 0, 0);
2569 }
2570
2571 tid
2572 }
2573
2574 fn create_queue_handlers<Q>(
2575 &self,
2576 dev: &Arc<UblkDev>,
2577 q_fn: Q,
2578 ) -> Vec<std::thread::JoinHandle<()>>
2579 where
2580 Q: FnOnce(u16, &UblkDev) + Send + Sync + Clone + 'static,
2581 {
2582 use std::sync::mpsc;
2583
2584 let mut q_threads = Vec::new();
2585 let nr_queues = dev.dev_info.nr_hw_queues;
2586
2587 let (tx, rx) = mpsc::channel();
2588
2589 for q in 0..nr_queues {
2590 let _dev = Arc::clone(dev);
2591 let _tx = tx.clone();
2592 let mut _q_fn = q_fn.clone();
2593
2594 q_threads.push(std::thread::spawn(move || {
2595 let tid = Self::init_queue_thread();
2596 if let Err(e) = _tx.send((q, tid)) {
2597 eprintln!("Warning: Failed to send queue thread info: {}", e);
2598 return;
2599 }
2600 _q_fn(q, &_dev);
2601 }));
2602 }
2603
2604 for _q in 0..nr_queues {
2606 let (qid, tid) = match rx.recv() {
2607 Ok(data) => data,
2608 Err(e) => {
2609 eprintln!("Warning: Failed to receive queue thread info: {}", e);
2610 continue;
2611 }
2612 };
2613
2614 self.set_thread_affinity(qid, tid);
2615 if let Err(e) = self.configure_queue(dev, qid, tid) {
2616 eprintln!(
2617 "Warning: configure queue failed for {}-{}: {:?}",
2618 dev.dev_info.dev_id, qid, e
2619 );
2620 }
2621 }
2622
2623 q_threads
2624 }
2625
2626 pub fn run_target<T, Q, W>(&self, tgt_fn: T, q_fn: Q, device_fn: W) -> Result<i32, UblkError>
2641 where
2642 T: FnOnce(&mut UblkDev) -> Result<(), UblkError>,
2643 Q: FnOnce(u16, &UblkDev) + Send + Sync + Clone + 'static,
2644 W: FnOnce(&UblkCtrl) + Send + Sync + 'static,
2645 {
2646 let dev = &Arc::new(UblkDev::new(self.get_name(), tgt_fn, self)?);
2647 let handles = self.create_queue_handlers(dev, q_fn);
2648
2649 self.start_dev(dev)?;
2650
2651 device_fn(self);
2652
2653 for qh in handles {
2654 qh.join().unwrap_or_else(|_| {
2655 eprintln!("dev-{} join queue thread failed", dev.dev_info.dev_id)
2656 });
2657 }
2658
2659 let _ = self.stop_dev();
2662
2663 Ok(0)
2664 }
2665
2666 pub fn for_each_dev_id<T>(ops: T)
2668 where
2669 T: Fn(u32) + Clone + 'static,
2670 {
2671 if let Ok(entries) = std::fs::read_dir("/sys/class/ublk-char") {
2672 for entry in entries.flatten() {
2673 let f = entry.path();
2674 if let Some(file_name) = f.file_name() {
2675 if let Some(name) = file_name.to_str() {
2676 if name.starts_with("ublkc") {
2678 if let Ok(dev_id) = name[5..].parse::<u32>() {
2679 ops(dev_id);
2680 }
2681 }
2682 }
2683 }
2684 }
2685 }
2686 }
2687}
2688
2689#[cfg(test)]
2690mod tests {
2691 use crate::ctrl::{UblkCtrlBuilder, UblkQueueAffinity};
2692 use crate::io::{UblkDev, UblkIOCtx, UblkQueue};
2693 use crate::UblkError;
2694 use crate::{ctrl::UblkCtrl, UblkFlags, UblkIORes};
2695 use std::cell::Cell;
2696 use std::path::Path;
2697 use std::rc::Rc;
2698
2699 #[test]
2700 fn test_ublk_get_features() {
2701 match UblkCtrl::get_features() {
2702 Some(f) => eprintln!("features is {:04x}", f),
2703 None => eprintln!("not support GET_FEATURES, require linux v6.5"),
2704 }
2705 }
2706
2707 fn __test_add_ctrl_dev(del_async: bool) {
2708 let ctrl = UblkCtrl::new(
2709 None,
2710 -1,
2711 1,
2712 64,
2713 512_u32 * 1024,
2714 0,
2715 0,
2716 if del_async {
2717 UblkFlags::UBLK_DEV_F_DEL_DEV_ASYNC
2718 } else {
2719 UblkFlags::empty()
2720 } | UblkFlags::UBLK_DEV_F_ADD_DEV,
2721 )
2722 .unwrap();
2723 let dev_path = ctrl.get_cdev_path();
2724
2725 std::thread::sleep(std::time::Duration::from_millis(500));
2726 assert!(Path::new(&dev_path).exists() == true);
2727 }
2728 #[test]
2729 fn test_add_ctrl_dev_del_sync() {
2730 __test_add_ctrl_dev(false);
2731 }
2732
2733 #[test]
2734 fn test_add_ctrl_dev_del_async() {
2735 __test_add_ctrl_dev(true);
2736 }
2737
2738 #[test]
2739 fn test_add_ctrl_dev_del_async2() {
2740 let ctrl = UblkCtrl::new(
2741 None,
2742 -1,
2743 1,
2744 64,
2745 512_u32 * 1024,
2746 0,
2747 0,
2748 UblkFlags::UBLK_DEV_F_ADD_DEV,
2749 )
2750 .unwrap();
2751
2752 match ctrl.del_dev_async() {
2753 Ok(_res) => {}
2754 Err(UblkError::UringIOError(res)) => {
2755 assert!(res == -524 || res == -libc::EOPNOTSUPP);
2757 }
2758 _ => assert!(false),
2759 }
2760 }
2761
2762 #[test]
2764 fn test_add_un_privileted_ublk() {
2765 let ctrl = UblkCtrl::new(
2766 None,
2767 -1,
2768 1,
2769 64,
2770 512_u32 * 1024,
2771 0,
2772 crate::sys::UBLK_F_UNPRIVILEGED_DEV as u64,
2773 UblkFlags::UBLK_DEV_F_ADD_DEV,
2774 )
2775 .unwrap();
2776 let dev_path = ctrl.get_cdev_path();
2777
2778 std::thread::sleep(std::time::Duration::from_millis(500));
2779 assert!(Path::new(&dev_path).exists() == true);
2780 }
2781
2782 #[test]
2783 fn test_set_queue_single_affinity() {
2784 let ctrl = UblkCtrlBuilder::default()
2785 .name("null")
2786 .nr_queues(2_u16)
2787 .dev_flags(UblkFlags::UBLK_DEV_F_ADD_DEV)
2788 .build()
2789 .unwrap();
2790
2791 let invalid_queue = ctrl.set_queue_single_affinity(100, None);
2793 assert!(invalid_queue.is_err());
2794
2795 let inner = ctrl.get_inner();
2797 assert_eq!(inner.queue_selected_cpus.len(), 2);
2798 assert_eq!(inner.queue_selected_cpus[0], 0); assert_eq!(inner.queue_selected_cpus[1], 0); }
2801
2802 #[test]
2803 fn test_get_queue_effective_affinity() {
2804 let ctrl = UblkCtrlBuilder::default()
2805 .name("null")
2806 .nr_queues(2_u16)
2807 .dev_flags(UblkFlags::UBLK_DEV_F_ADD_DEV)
2808 .build()
2809 .unwrap();
2810
2811 let mut affinity = UblkQueueAffinity::new();
2812
2813 let invalid_queue_result = ctrl.get_queue_effective_affinity(100, &mut affinity);
2815 assert!(invalid_queue_result.is_err());
2816
2817 let unconfigured_result = ctrl.get_queue_effective_affinity(0, &mut affinity);
2819 assert!(unconfigured_result.is_err());
2820 if let Err(UblkError::OtherError(err)) = unconfigured_result {
2822 assert_eq!(err, -libc::ESRCH);
2823 }
2824
2825 assert!(true); }
2829
2830 #[test]
2831 fn test_ublk_target_json() {
2832 let ctrl = UblkCtrlBuilder::default()
2833 .name("null")
2834 .ctrl_target_flags(0xbeef as u64)
2835 .dev_flags(UblkFlags::UBLK_DEV_F_ADD_DEV)
2836 .build()
2837 .unwrap();
2838
2839 let tgt_init = |dev: &mut UblkDev| {
2840 dev.set_default_params(250_u64 << 30);
2841 dev.set_target_json(serde_json::json!({"null": "test_data" }));
2842 Ok(())
2843 };
2844 let dev = UblkDev::new(ctrl.get_name(), tgt_init, &ctrl).unwrap();
2845
2846 assert!(ctrl.get_target_data_from_json().is_none());
2848 assert!(dev.get_target_json().is_some());
2849 assert!(dev.dev_info.ublksrv_flags == 0xbeef as u64);
2850 assert!(ctrl.dev_info().ublksrv_flags == 0xbeef as u64);
2851 }
2852
2853 fn __test_ublk_session<T>(mut w_fn: T) -> String
2854 where
2855 T: FnMut(&UblkCtrl) + Send + Sync + Clone + 'static,
2856 {
2857 let ctrl = UblkCtrlBuilder::default()
2858 .name("null")
2859 .depth(16_u16)
2860 .nr_queues(2_u16)
2861 .dev_flags(UblkFlags::UBLK_DEV_F_ADD_DEV)
2862 .build()
2863 .unwrap();
2864
2865 let tgt_init = |dev: &mut UblkDev| {
2866 dev.set_default_params(250_u64 << 30);
2867 dev.set_target_json(serde_json::json!({"null": "test_data" }));
2868 Ok(())
2869 };
2870 let q_fn = move |qid: u16, dev: &UblkDev| {
2871 use crate::BufDescList;
2872 let bufs_rc = Rc::new(dev.alloc_queue_io_bufs());
2873 let bufs = bufs_rc.clone();
2874
2875 let io_handler = move |q: &UblkQueue, tag: u16, _io: &UblkIOCtx| {
2876 let iod = q.get_iod(tag);
2877 let bytes = (iod.nr_sectors << 9) as i32;
2878 let bufs = bufs_rc.clone();
2879 let buf_addr = bufs[tag as usize].as_mut_ptr();
2880
2881 #[allow(deprecated)]
2882 q.complete_io_cmd(tag, buf_addr, Ok(UblkIORes::Result(bytes)));
2883 };
2884
2885 let queue = match UblkQueue::new(qid, dev)
2886 .unwrap()
2887 .submit_fetch_commands_unified(BufDescList::Slices(Some(&bufs)))
2888 {
2889 Ok(q) => q,
2890 Err(e) => {
2891 log::error!("submit_fetch_commands_unified failed: {}", e);
2892 return;
2893 }
2894 };
2895
2896 queue.wait_and_handle_io(io_handler);
2897 };
2898
2899 ctrl.run_target(tgt_init, q_fn, move |ctrl: &UblkCtrl| {
2900 w_fn(ctrl);
2901 })
2902 .unwrap();
2903
2904 let bdev = ctrl.get_bdev_path();
2906 assert!(Path::new(&bdev).exists() == false);
2907
2908 let cpath = ctrl.get_cdev_path();
2909
2910 cpath
2911 }
2912
2913 #[test]
2916 fn test_ublk_session() {
2917 let cdev = __test_ublk_session(|ctrl: &UblkCtrl| {
2918 assert!(ctrl.get_target_data_from_json().is_some());
2919 ctrl.kill_dev().unwrap();
2920 });
2921
2922 assert!(Path::new(&cdev).exists() == false);
2924 }
2925 #[test]
2927 fn test_ublk_for_each_dev_id() {
2928 use std::sync::{atomic::AtomicI32, atomic::Ordering, Arc};
2929
2930 let created_dev_id = Arc::new(AtomicI32::new(-1));
2932 let created_dev_id_clone = created_dev_id.clone();
2933
2934 let handle = std::thread::spawn(move || {
2935 let cdev = __test_ublk_session(move |ctrl: &UblkCtrl| {
2936 created_dev_id_clone.store(ctrl.dev_info().dev_id as i32, Ordering::Relaxed);
2938 std::thread::sleep(std::time::Duration::from_millis(1000));
2939 ctrl.kill_dev().unwrap();
2940 });
2941 assert!(Path::new(&cdev).exists() == false);
2943 });
2944
2945 std::thread::sleep(std::time::Duration::from_millis(400));
2946 let cnt_arc = Rc::new(Cell::new(0));
2947 let cnt = cnt_arc.clone();
2948
2949 UblkCtrl::for_each_dev_id(move |dev_id| {
2951 let target_dev_id = created_dev_id.load(Ordering::Relaxed);
2952 if target_dev_id >= 0 && dev_id == target_dev_id as u32 {
2953 let ctrl = UblkCtrl::new_simple(dev_id as i32).unwrap();
2954 cnt.set(cnt.get() + 1);
2955
2956 let dev_path = ctrl.get_cdev_path();
2957 assert!(Path::new(&dev_path).exists() == true);
2958 }
2959 });
2960
2961 assert_eq!(cnt_arc.get(), 1);
2963
2964 handle.join().unwrap();
2965 }
2966
2967 #[test]
2969 fn test_single_cpu_affinity() {
2970 let single_cpu_flags =
2972 UblkFlags::UBLK_DEV_F_ADD_DEV | UblkFlags::UBLK_DEV_F_SINGLE_CPU_AFFINITY;
2973 let normal_flags = UblkFlags::UBLK_DEV_F_ADD_DEV;
2974
2975 assert!(single_cpu_flags.contains(UblkFlags::UBLK_DEV_F_SINGLE_CPU_AFFINITY));
2976 assert!(!normal_flags.contains(UblkFlags::UBLK_DEV_F_SINGLE_CPU_AFFINITY));
2977
2978 let ctrl_with_flag = UblkCtrlBuilder::default()
2980 .name("test_single_cpu")
2981 .depth(16_u16)
2982 .nr_queues(2_u16)
2983 .dev_flags(single_cpu_flags)
2984 .build()
2985 .unwrap();
2986
2987 let ctrl_without_flag = UblkCtrlBuilder::default()
2988 .name("test_normal")
2989 .depth(16_u16)
2990 .nr_queues(2_u16)
2991 .dev_flags(normal_flags)
2992 .build()
2993 .unwrap();
2994
2995 assert!(ctrl_with_flag
2997 .get_dev_flags()
2998 .contains(UblkFlags::UBLK_DEV_F_SINGLE_CPU_AFFINITY));
2999 assert!(!ctrl_without_flag
3000 .get_dev_flags()
3001 .contains(UblkFlags::UBLK_DEV_F_SINGLE_CPU_AFFINITY));
3002
3003 let test_affinity = UblkQueueAffinity::from_single_cpu(3);
3005 let bits = test_affinity.to_bits_vec();
3006 assert_eq!(
3007 bits.len(),
3008 1,
3009 "Single CPU affinity should contain exactly one CPU"
3010 );
3011 assert_eq!(bits[0], 3, "Single CPU affinity should contain CPU 3");
3012
3013 let mut multi_cpu_affinity = UblkQueueAffinity::new();
3015 multi_cpu_affinity.set_cpu(1);
3016 multi_cpu_affinity.set_cpu(3);
3017 multi_cpu_affinity.set_cpu(5);
3018
3019 let selected_cpu = multi_cpu_affinity.get_random_cpu();
3020 assert!(
3021 selected_cpu.is_some(),
3022 "Should be able to select a CPU from multi-CPU affinity"
3023 );
3024
3025 let cpu = selected_cpu.unwrap();
3026 assert!(
3027 cpu == 1 || cpu == 3 || cpu == 5,
3028 "Selected CPU should be one of the available CPUs (1, 3, or 5), got {}",
3029 cpu
3030 );
3031
3032 println!("✓ Single CPU affinity feature tests passed");
3033 println!(" - Flag definition and usage: PASS");
3034 println!(" - Control device flag storage: PASS");
3035 println!(" - Single CPU affinity creation: PASS");
3036 println!(" - Random CPU selection: PASS (selected CPU {})", cpu);
3037 }
3038}