cloud_filter/root/
session.rs

1use std::{
2    fs::OpenOptions,
3    mem::{self, MaybeUninit},
4    os::windows::{fs::OpenOptionsExt, io::AsRawHandle},
5    path::{Path, PathBuf},
6    sync::{
7        mpsc::{self, Sender, TryRecvError},
8        Arc, Weak,
9    },
10    thread::{self, JoinHandle},
11    time::Duration,
12};
13
14use widestring::{U16CString, U16Str};
15use windows::{
16    core::{self, PCWSTR},
17    Win32::{
18        Foundation::{ERROR_IO_INCOMPLETE, HANDLE, WIN32_ERROR},
19        Storage::{
20            CloudFilters::{self, CfConnectSyncRoot, CF_CONNECT_FLAGS},
21            FileSystem::{
22                ReadDirectoryChangesW, FILE_FLAG_BACKUP_SEMANTICS, FILE_FLAG_OVERLAPPED,
23                FILE_LIST_DIRECTORY, FILE_NOTIFY_CHANGE_ATTRIBUTES, FILE_NOTIFY_INFORMATION,
24            },
25        },
26        System::IO::{CancelIoEx, GetOverlappedResult},
27    },
28};
29
30use crate::{
31    filter::{self, AsyncBridge, Filter, SyncFilter},
32    root::connect::Connection,
33    utility::LocalBoxFuture,
34};
35
36/// A builder to create a new connection for the sync root at the specified path.
37#[derive(Debug, Clone, Copy)]
38pub struct Session(CF_CONNECT_FLAGS);
39
40impl Session {
41    /// Create a new [Session].
42    pub fn new() -> Self {
43        Self::default()
44    }
45
46    /// The [Session::block_implicit_hydration] flag will prevent
47    /// implicit placeholder hydrations from invoking
48    /// [SyncFilter::fetch_data][crate::filter::SyncFilter::fetch_data]. This could occur when an
49    /// anti-virus is scanning file system activity on files within the sync root.
50    ///
51    /// A call to the [Placeholder::hydrate][crate::placeholder::Placeholder::hydrate] trait will not be blocked by this flag.
52    pub fn block_implicit_hydration(mut self) -> Self {
53        self.0 |= CloudFilters::CF_CONNECT_FLAG_BLOCK_SELF_IMPLICIT_HYDRATION;
54        self
55    }
56
57    /// Initiates a connection to the sync root with the given [SyncFilter].
58    pub fn connect<P, F>(self, path: P, filter: F) -> core::Result<Connection<F>>
59    where
60        P: AsRef<Path>,
61        F: SyncFilter + 'static,
62    {
63        let filter = Arc::new(filter);
64        let callbacks = filter::callbacks::<F>();
65        let key = unsafe {
66            CfConnectSyncRoot(
67                PCWSTR(
68                    U16CString::from_os_str(path.as_ref())
69                        .expect("not contains nul")
70                        .as_ptr(),
71                ),
72                callbacks.as_ptr(),
73                // create a weak arc so that it could be upgraded when it's being used and when the
74                // connection is closed, the filter could be freed
75                Some(Weak::into_raw(Arc::downgrade(&filter)) as *const _),
76                // This is enabled by default to remove the Option requirement around various fields of the
77                // [Request][crate::Request] struct
78                self.0
79                    | CloudFilters::CF_CONNECT_FLAG_REQUIRE_FULL_FILE_PATH
80                    | CloudFilters::CF_CONNECT_FLAG_REQUIRE_PROCESS_INFO,
81            )
82        }?;
83
84        let (cancel_token, join_handle) =
85            spawn_root_watcher(path.as_ref().to_path_buf(), filter.clone());
86
87        Ok(Connection::new(
88            key.0,
89            cancel_token,
90            join_handle,
91            callbacks,
92            filter,
93        ))
94    }
95
96    /// Initiates a connection to the sync root with the given [Filter].
97    pub fn connect_async<P, F, B>(
98        self,
99        path: P,
100        filter: F,
101        block_on: B,
102    ) -> core::Result<Connection<AsyncBridge<F, B>>>
103    where
104        P: AsRef<Path>,
105        F: Filter + 'static,
106        B: Fn(LocalBoxFuture<'_, ()>) + Send + Sync + 'static,
107    {
108        self.connect(path, AsyncBridge::new(filter, block_on))
109    }
110}
111
112impl Default for Session {
113    fn default() -> Self {
114        Self(CloudFilters::CF_CONNECT_FLAG_NONE)
115    }
116}
117
118fn spawn_root_watcher<T: SyncFilter + 'static>(
119    path: PathBuf,
120    filter: Arc<T>,
121) -> (Sender<()>, JoinHandle<()>) {
122    let (tx, rx) = mpsc::channel();
123    let handle = thread::spawn(move || {
124        const CHANGE_BUF_SIZE: usize = 1024;
125
126        let sync_root = OpenOptions::new()
127            .access_mode(FILE_LIST_DIRECTORY.0)
128            .custom_flags((FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED).0)
129            .open(&path)
130            .expect("sync root directory is opened");
131        let mut changes_buf = MaybeUninit::<[u8; CHANGE_BUF_SIZE]>::zeroed();
132        let mut overlapped = MaybeUninit::zeroed();
133        let mut transferred = MaybeUninit::zeroed();
134
135        while matches!(rx.try_recv(), Err(TryRecvError::Empty)) {
136            unsafe {
137                ReadDirectoryChangesW(
138                    HANDLE(sync_root.as_raw_handle() as _),
139                    changes_buf.as_mut_ptr() as *mut _,
140                    CHANGE_BUF_SIZE as _,
141                    true,
142                    FILE_NOTIFY_CHANGE_ATTRIBUTES,
143                    None,
144                    Some(overlapped.as_mut_ptr()),
145                    None,
146                )
147            }
148            .expect("read directory changes");
149
150            loop {
151                if let Err(e) = unsafe {
152                    GetOverlappedResult(
153                        HANDLE(sync_root.as_raw_handle() as _),
154                        overlapped.as_ptr(),
155                        transferred.as_mut_ptr(),
156                        false,
157                    )
158                } {
159                    if e.code() != ERROR_IO_INCOMPLETE.to_hresult() {
160                        panic!(
161                            "get overlapped result: {:?}, expected: {ERROR_IO_INCOMPLETE:?}",
162                            WIN32_ERROR::from_error(&e),
163                        );
164                    }
165
166                    // cancel by user
167                    if !matches!(rx.try_recv(), Err(TryRecvError::Empty)) {
168                        _ = unsafe {
169                            CancelIoEx(
170                                HANDLE(sync_root.as_raw_handle() as _),
171                                Some(overlapped.as_ptr()),
172                            )
173                        };
174                        return;
175                    }
176
177                    thread::sleep(Duration::from_millis(300));
178                    continue;
179                }
180
181                if unsafe { transferred.assume_init() } == 0 {
182                    break;
183                }
184
185                let mut changes = Vec::with_capacity(8);
186                let mut entry = changes_buf.as_ptr() as *const FILE_NOTIFY_INFORMATION;
187                loop {
188                    let relative = unsafe {
189                        U16Str::from_ptr(
190                            &(*entry).FileName as *const _,
191                            (*entry).FileNameLength as usize / mem::size_of::<u16>(),
192                        )
193                    };
194
195                    changes.push(path.join(relative.to_os_string()));
196
197                    if unsafe { *entry }.NextEntryOffset == 0 {
198                        break;
199                    }
200                    entry = unsafe { entry.byte_add((*entry).NextEntryOffset as _) };
201                }
202
203                filter.state_changed(changes);
204                break;
205            }
206        }
207    });
208
209    (tx, handle)
210}