Skip to main content

diskann_platform/win/
io_completion_port.rs

1/*
2 * Copyright (c) Microsoft Corporation.
3 * Licensed under the MIT license.
4 */
5use std::{
6    io,
7    sync::{Mutex, MutexGuard},
8};
9
10use windows_sys::Win32::{
11    Foundation::{CloseHandle, GetLastError, HANDLE, INVALID_HANDLE_VALUE},
12    System::IO::CreateIoCompletionPort,
13};
14
15use super::{DWORD, ULONG_PTR};
16use crate::FileHandle;
17
18/// This module provides a safe and idiomatic Rust interface over the IOCompletionPort handle and associated Windows API functions.
19/// This struct represents an I/O completion port, which is an object used in asynchronous I/O operations on Windows.
20pub struct IOCompletionPort {
21    io_completion_port: Mutex<HANDLE>,
22}
23
24unsafe impl Send for IOCompletionPort {}
25unsafe impl Sync for IOCompletionPort {}
26
27impl IOCompletionPort {
28    /// Create a new IOCompletionPort.
29    /// This function wraps the Windows CreateIoCompletionPort function, providing error handling and automatic resource management.
30    ///
31    /// # Arguments
32    ///
33    /// * `file_handle` - A reference to a FileHandle to associate with the IOCompletionPort.
34    /// * `existing_completion_port` - An optional reference to an existing IOCompletionPort. If provided, the new IOCompletionPort will be associated with it.
35    /// * `completion_key` - The completion key associated with the file handle.
36    /// * `number_of_concurrent_threads` - The maximum number of threads that the operating system can allow to concurrently process I/O completion packets for the I/O completion port.
37    ///
38    /// # Return
39    ///
40    /// Returns a Result with the new IOCompletionPort if successful, or an io::Error if the function fails.
41    pub fn new(
42        file_handle: &FileHandle,
43        existing_completion_port: Option<&IOCompletionPort>,
44        completion_key: ULONG_PTR,
45        number_of_concurrent_threads: DWORD,
46    ) -> io::Result<Self> {
47        let existing_completion_port_handle = if let Some(port) = existing_completion_port {
48            Some(port.mutex_guarded_handle()?)
49        } else {
50            None
51        };
52
53        let io_completion_port = unsafe {
54            CreateIoCompletionPort(
55                file_handle.handle,
56                existing_completion_port_handle.map_or(std::ptr::null_mut(), |handle| *handle),
57                completion_key,
58                number_of_concurrent_threads,
59            )
60        };
61
62        if io_completion_port == INVALID_HANDLE_VALUE {
63            let error_code = unsafe { GetLastError() };
64            return Err(io::Error::from_raw_os_error(error_code as i32));
65        }
66
67        Ok(Self {
68            io_completion_port: Mutex::new(io_completion_port),
69        })
70    }
71
72    pub fn mutex_guarded_handle(&self) -> io::Result<MutexGuard<'_, HANDLE>> {
73        self.io_completion_port.lock().map_err(|_| {
74            io::Error::new(
75                io::ErrorKind::WouldBlock,
76                "Unable to acquire lock on IOCompletionPort.",
77            )
78        })
79    }
80}
81
82impl Drop for IOCompletionPort {
83    /// Drop method for IOCompletionPort.
84    /// This wraps the Windows CloseHandle function, providing automatic resource cleanup when the IOCompletionPort is dropped.
85    /// If an error occurs while dropping, it is logged and the drop continues. This is because panicking in Drop can cause unwinding issues.
86    fn drop(&mut self) {
87        let handle = match self.io_completion_port.lock() {
88            Ok(guard) => *guard,
89            Err(_) => {
90                tracing::warn!("Error when locking IOCompletionPort.");
91                return;
92            }
93        };
94
95        let result = unsafe { CloseHandle(handle) };
96        if result == 0 {
97            let error_code = unsafe { GetLastError() };
98            let error = io::Error::from_raw_os_error(error_code as i32);
99
100            tracing::warn!("Error when dropping IOCompletionPort: {:?}", error);
101        }
102    }
103}
104
105impl Default for IOCompletionPort {
106    /// Create a default IOCompletionPort, whose handle is set to INVALID_HANDLE_VALUE.
107    /// Returns a new IOCompletionPort with handle set to INVALID_HANDLE_VALUE.
108    fn default() -> Self {
109        Self {
110            io_completion_port: Mutex::new(INVALID_HANDLE_VALUE),
111        }
112    }
113}
114
115#[cfg(test)]
116mod tests {
117    use super::*;
118    use crate::win::file_handle::{AccessMode, ShareMode};
119
120    #[test]
121    fn create_io_completion_port() {
122        let file_name = "../test_data/delete_set_50pts.bin";
123        let file_handle = unsafe { FileHandle::new(file_name, AccessMode::Read, ShareMode::Read) }
124            .expect("Failed to create file handle.");
125
126        let io_completion_port = IOCompletionPort::new(&file_handle, None, 0, 0);
127
128        assert!(
129            io_completion_port.is_ok(),
130            "Failed to create IOCompletionPort."
131        );
132    }
133
134    #[test]
135    fn drop_io_completion_port() {
136        let file_name = "../test_data/delete_set_50pts.bin";
137        let file_handle = unsafe { FileHandle::new(file_name, AccessMode::Read, ShareMode::Read) }
138            .expect("Failed to create file handle.");
139
140        let io_completion_port = IOCompletionPort::new(&file_handle, None, 0, 0)
141            .expect("Failed to create IOCompletionPort.");
142
143        // After this line, io_completion_port goes out of scope and its Drop trait will be called.
144        let _ = io_completion_port;
145        // We have no easy way to test that the Drop trait works correctly, but if it doesn't,
146        // a resource leak or other problem may become apparent in later tests or in real use of the code.
147    }
148
149    #[test]
150    fn default_io_completion_port() {
151        let io_completion_port = IOCompletionPort::default();
152        assert_eq!(
153            *io_completion_port.mutex_guarded_handle().unwrap(),
154            INVALID_HANDLE_VALUE,
155            "Default IOCompletionPort did not have INVALID_HANDLE_VALUE."
156        );
157    }
158}