diskann_platform/win/
io_completion_port.rs1use std::{
6 io,
7 sync::{Mutex, MutexGuard},
8};
9
10use windows_sys::Win32::{
11 Foundation::{CloseHandle, GetLastError, HANDLE, INVALID_HANDLE_VALUE},
12 System::IO::CreateIoCompletionPort,
13};
14
15use super::{DWORD, ULONG_PTR};
16use crate::FileHandle;
17
18pub struct IOCompletionPort {
21 io_completion_port: Mutex<HANDLE>,
22}
23
24unsafe impl Send for IOCompletionPort {}
25unsafe impl Sync for IOCompletionPort {}
26
27impl IOCompletionPort {
28 pub fn new(
42 file_handle: &FileHandle,
43 existing_completion_port: Option<&IOCompletionPort>,
44 completion_key: ULONG_PTR,
45 number_of_concurrent_threads: DWORD,
46 ) -> io::Result<Self> {
47 let existing_completion_port_handle = if let Some(port) = existing_completion_port {
48 Some(port.mutex_guarded_handle()?)
49 } else {
50 None
51 };
52
53 let io_completion_port = unsafe {
54 CreateIoCompletionPort(
55 file_handle.handle,
56 existing_completion_port_handle.map_or(std::ptr::null_mut(), |handle| *handle),
57 completion_key,
58 number_of_concurrent_threads,
59 )
60 };
61
62 if io_completion_port == INVALID_HANDLE_VALUE {
63 let error_code = unsafe { GetLastError() };
64 return Err(io::Error::from_raw_os_error(error_code as i32));
65 }
66
67 Ok(Self {
68 io_completion_port: Mutex::new(io_completion_port),
69 })
70 }
71
72 pub fn mutex_guarded_handle(&self) -> io::Result<MutexGuard<'_, HANDLE>> {
73 self.io_completion_port.lock().map_err(|_| {
74 io::Error::new(
75 io::ErrorKind::WouldBlock,
76 "Unable to acquire lock on IOCompletionPort.",
77 )
78 })
79 }
80}
81
82impl Drop for IOCompletionPort {
83 fn drop(&mut self) {
87 let handle = match self.io_completion_port.lock() {
88 Ok(guard) => *guard,
89 Err(_) => {
90 tracing::warn!("Error when locking IOCompletionPort.");
91 return;
92 }
93 };
94
95 let result = unsafe { CloseHandle(handle) };
96 if result == 0 {
97 let error_code = unsafe { GetLastError() };
98 let error = io::Error::from_raw_os_error(error_code as i32);
99
100 tracing::warn!("Error when dropping IOCompletionPort: {:?}", error);
101 }
102 }
103}
104
105impl Default for IOCompletionPort {
106 fn default() -> Self {
109 Self {
110 io_completion_port: Mutex::new(INVALID_HANDLE_VALUE),
111 }
112 }
113}
114
115#[cfg(test)]
116mod tests {
117 use super::*;
118 use crate::win::file_handle::{AccessMode, ShareMode};
119
120 #[test]
121 fn create_io_completion_port() {
122 let file_name = "../test_data/delete_set_50pts.bin";
123 let file_handle = unsafe { FileHandle::new(file_name, AccessMode::Read, ShareMode::Read) }
124 .expect("Failed to create file handle.");
125
126 let io_completion_port = IOCompletionPort::new(&file_handle, None, 0, 0);
127
128 assert!(
129 io_completion_port.is_ok(),
130 "Failed to create IOCompletionPort."
131 );
132 }
133
134 #[test]
135 fn drop_io_completion_port() {
136 let file_name = "../test_data/delete_set_50pts.bin";
137 let file_handle = unsafe { FileHandle::new(file_name, AccessMode::Read, ShareMode::Read) }
138 .expect("Failed to create file handle.");
139
140 let io_completion_port = IOCompletionPort::new(&file_handle, None, 0, 0)
141 .expect("Failed to create IOCompletionPort.");
142
143 let _ = io_completion_port;
145 }
148
149 #[test]
150 fn default_io_completion_port() {
151 let io_completion_port = IOCompletionPort::default();
152 assert_eq!(
153 *io_completion_port.mutex_guarded_handle().unwrap(),
154 INVALID_HANDLE_VALUE,
155 "Default IOCompletionPort did not have INVALID_HANDLE_VALUE."
156 );
157 }
158}