1use std::{
2 fs::OpenOptions,
3 mem::{self, MaybeUninit},
4 os::windows::{fs::OpenOptionsExt, io::AsRawHandle},
5 path::{Path, PathBuf},
6 sync::{
7 mpsc::{self, Sender, TryRecvError},
8 Arc, Weak,
9 },
10 thread::{self, JoinHandle},
11 time::Duration,
12};
13
14use widestring::{U16CString, U16Str};
15use windows::{
16 core::{self, PCWSTR},
17 Win32::{
18 Foundation::{ERROR_IO_INCOMPLETE, HANDLE, WIN32_ERROR},
19 Storage::{
20 CloudFilters::{self, CfConnectSyncRoot, CF_CONNECT_FLAGS},
21 FileSystem::{
22 ReadDirectoryChangesW, FILE_FLAG_BACKUP_SEMANTICS, FILE_FLAG_OVERLAPPED,
23 FILE_LIST_DIRECTORY, FILE_NOTIFY_CHANGE_ATTRIBUTES, FILE_NOTIFY_INFORMATION,
24 },
25 },
26 System::IO::{CancelIoEx, GetOverlappedResult},
27 },
28};
29
30use crate::{
31 filter::{self, AsyncBridge, Filter, SyncFilter},
32 root::connect::Connection,
33 utility::LocalBoxFuture,
34};
35
36#[derive(Debug, Clone, Copy)]
38pub struct Session(CF_CONNECT_FLAGS);
39
40impl Session {
41 pub fn new() -> Self {
43 Self::default()
44 }
45
46 pub fn block_implicit_hydration(mut self) -> Self {
53 self.0 |= CloudFilters::CF_CONNECT_FLAG_BLOCK_SELF_IMPLICIT_HYDRATION;
54 self
55 }
56
57 pub fn connect<P, F>(self, path: P, filter: F) -> core::Result<Connection<F>>
59 where
60 P: AsRef<Path>,
61 F: SyncFilter + 'static,
62 {
63 let filter = Arc::new(filter);
64 let callbacks = filter::callbacks::<F>();
65 let key = unsafe {
66 CfConnectSyncRoot(
67 PCWSTR(
68 U16CString::from_os_str(path.as_ref())
69 .expect("not contains nul")
70 .as_ptr(),
71 ),
72 callbacks.as_ptr(),
73 Some(Weak::into_raw(Arc::downgrade(&filter)) as *const _),
76 self.0
79 | CloudFilters::CF_CONNECT_FLAG_REQUIRE_FULL_FILE_PATH
80 | CloudFilters::CF_CONNECT_FLAG_REQUIRE_PROCESS_INFO,
81 )
82 }?;
83
84 let (cancel_token, join_handle) =
85 spawn_root_watcher(path.as_ref().to_path_buf(), filter.clone());
86
87 Ok(Connection::new(
88 key.0,
89 cancel_token,
90 join_handle,
91 callbacks,
92 filter,
93 ))
94 }
95
96 pub fn connect_async<P, F, B>(
98 self,
99 path: P,
100 filter: F,
101 block_on: B,
102 ) -> core::Result<Connection<AsyncBridge<F, B>>>
103 where
104 P: AsRef<Path>,
105 F: Filter + 'static,
106 B: Fn(LocalBoxFuture<'_, ()>) + Send + Sync + 'static,
107 {
108 self.connect(path, AsyncBridge::new(filter, block_on))
109 }
110}
111
112impl Default for Session {
113 fn default() -> Self {
114 Self(CloudFilters::CF_CONNECT_FLAG_NONE)
115 }
116}
117
118fn spawn_root_watcher<T: SyncFilter + 'static>(
119 path: PathBuf,
120 filter: Arc<T>,
121) -> (Sender<()>, JoinHandle<()>) {
122 let (tx, rx) = mpsc::channel();
123 let handle = thread::spawn(move || {
124 const CHANGE_BUF_SIZE: usize = 1024;
125
126 let sync_root = OpenOptions::new()
127 .access_mode(FILE_LIST_DIRECTORY.0)
128 .custom_flags((FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED).0)
129 .open(&path)
130 .expect("sync root directory is opened");
131 let mut changes_buf = MaybeUninit::<[u8; CHANGE_BUF_SIZE]>::zeroed();
132 let mut overlapped = MaybeUninit::zeroed();
133 let mut transferred = MaybeUninit::zeroed();
134
135 while matches!(rx.try_recv(), Err(TryRecvError::Empty)) {
136 unsafe {
137 ReadDirectoryChangesW(
138 HANDLE(sync_root.as_raw_handle() as _),
139 changes_buf.as_mut_ptr() as *mut _,
140 CHANGE_BUF_SIZE as _,
141 true,
142 FILE_NOTIFY_CHANGE_ATTRIBUTES,
143 None,
144 Some(overlapped.as_mut_ptr()),
145 None,
146 )
147 }
148 .expect("read directory changes");
149
150 loop {
151 if let Err(e) = unsafe {
152 GetOverlappedResult(
153 HANDLE(sync_root.as_raw_handle() as _),
154 overlapped.as_ptr(),
155 transferred.as_mut_ptr(),
156 false,
157 )
158 } {
159 if e.code() != ERROR_IO_INCOMPLETE.to_hresult() {
160 panic!(
161 "get overlapped result: {:?}, expected: {ERROR_IO_INCOMPLETE:?}",
162 WIN32_ERROR::from_error(&e),
163 );
164 }
165
166 if !matches!(rx.try_recv(), Err(TryRecvError::Empty)) {
168 _ = unsafe {
169 CancelIoEx(
170 HANDLE(sync_root.as_raw_handle() as _),
171 Some(overlapped.as_ptr()),
172 )
173 };
174 return;
175 }
176
177 thread::sleep(Duration::from_millis(300));
178 continue;
179 }
180
181 if unsafe { transferred.assume_init() } == 0 {
182 break;
183 }
184
185 let mut changes = Vec::with_capacity(8);
186 let mut entry = changes_buf.as_ptr() as *const FILE_NOTIFY_INFORMATION;
187 loop {
188 let relative = unsafe {
189 U16Str::from_ptr(
190 &(*entry).FileName as *const _,
191 (*entry).FileNameLength as usize / mem::size_of::<u16>(),
192 )
193 };
194
195 changes.push(path.join(relative.to_os_string()));
196
197 if unsafe { *entry }.NextEntryOffset == 0 {
198 break;
199 }
200 entry = unsafe { entry.byte_add((*entry).NextEntryOffset as _) };
201 }
202
203 filter.state_changed(changes);
204 break;
205 }
206 }
207 });
208
209 (tx, handle)
210}