1#![allow(unused_imports)]
2
3use std::{
4 collections::{HashMap, VecDeque},
5 ffi::{OsStr, OsString},
6};
7
8use super::inode_mapping::FileIdResolver;
9use crate::fuse_handler::FuseHandler;
10use crate::types::*;
11
12type DirIter<TAttr> = HashMap<(u64, i64), VecDeque<(OsString, u64, TAttr)>>;
13
14#[cfg(feature = "serial")]
15mod serial {
16 use super::*;
17
18 use std::cell::RefCell;
19
20 pub(crate) struct FuseDriver<TId, THandler>
21 where
22 TId: FileIdType,
23 THandler: FuseHandler<TId>,
24 {
25 handler: THandler,
26 resolver: TId::Resolver,
27 dirmap_iter: RefCell<DirIter<FileKind>>,
28 dirmapplus_iter: RefCell<DirIter<FileAttribute>>,
29 }
30
31 impl<TId, THandler> FuseDriver<TId, THandler>
32 where
33 TId: FileIdType,
34 THandler: FuseHandler<TId>,
35 {
36 pub fn new(handler: THandler, _num_threads: usize) -> FuseDriver<TId, THandler> {
38 FuseDriver {
39 handler,
40 resolver: TId::Resolver::new(),
41 dirmap_iter: RefCell::new(HashMap::new()),
42 dirmapplus_iter: RefCell::new(HashMap::new()),
43 }
44 }
45
46 pub fn get_handler(&self) -> &THandler {
47 &self.handler
48 }
49
50 pub fn get_resolver(&self) -> &TId::Resolver {
51 &self.resolver
52 }
53
54 pub fn get_dirmap_iter(&self) -> &RefCell<DirIter<FileKind>> {
55 &self.dirmap_iter
56 }
57
58 pub fn get_dirmapplus_iter(&self) -> &RefCell<DirIter<FileAttribute>> {
59 &self.dirmapplus_iter
60 }
61 }
62
63 macro_rules! execute_task {
64 ($self:expr, $block:block) => {
65 $block
66 };
67 }
68
69 macro_rules! reply_executor {
70 ($self:expr) => {
71 ()
72 };
73 }
74
75 macro_rules! execute_reply_task {
76 ($reply_executor:expr, $block:block) => {
77 $block
78 };
79 }
80
81 pub(crate) use execute_reply_task;
82 pub(crate) use execute_task;
83 pub(crate) use reply_executor;
84}
85
86#[cfg(feature = "parallel")]
87mod parallel {
88 use super::*;
89
90 use std::{
91 sync::Arc,
92 thread::{self, available_parallelism},
93 };
94
95 use threadpool::ThreadPool;
96
97 #[cfg(feature = "deadlock_detection")]
98 use parking_lot::{Mutex, MutexGuard};
99 #[cfg(not(feature = "deadlock_detection"))]
100 use std::sync::{Mutex, MutexGuard};
101
102 pub(crate) struct FuseDriver<TId, THandler>
103 where
104 TId: FileIdType,
105 THandler: FuseHandler<TId>,
106 {
107 handler: Arc<THandler>,
108 resolver: Arc<TId::Resolver>,
109 dirmap_iter: Arc<Mutex<DirIter<FileKind>>>,
110 dirmapplus_iter: Arc<Mutex<DirIter<FileAttribute>>>,
111 pub threadpool: ThreadPool,
112 pub reply_threadpool: ThreadPool,
113 }
114
115 impl<TId, THandler> Drop for FuseDriver<TId, THandler>
116 where
117 TId: FileIdType,
118 THandler: FuseHandler<TId>,
119 {
120 fn drop(&mut self) {
121 self.threadpool.join();
122 self.reply_threadpool.join();
123 }
124 }
125
126 impl<TId, THandler> FuseDriver<TId, THandler>
127 where
128 TId: FileIdType,
129 THandler: FuseHandler<TId>,
130 {
131 pub fn new(handler: THandler, num_threads: usize) -> FuseDriver<TId, THandler> {
132 #[cfg(feature = "deadlock_detection")]
133 spawn_deadlock_checker();
134 FuseDriver {
135 handler: Arc::new(handler),
136 resolver: Arc::new(TId::create_resolver()),
137 dirmap_iter: Arc::new(Mutex::new(HashMap::new())),
138 dirmapplus_iter: Arc::new(Mutex::new(HashMap::new())),
139 threadpool: ThreadPool::new(num_threads),
140 reply_threadpool: ThreadPool::new(num_threads),
141 }
142 }
143
144 pub fn get_handler(&self) -> Arc<THandler> {
145 self.handler.clone()
146 }
147
148 pub fn get_resolver(&self) -> Arc<TId::Resolver> {
149 self.resolver.clone()
150 }
151
152 pub fn get_dirmap_iter(&self) -> Arc<Mutex<DirIter<FileKind>>> {
153 self.dirmap_iter.clone()
154 }
155
156 pub fn get_dirmapplus_iter(&self) -> Arc<Mutex<DirIter<FileAttribute>>> {
157 self.dirmapplus_iter.clone()
158 }
159 }
160
161 macro_rules! execute_task {
162 ($self:expr, $block:block) => {
163 $self.threadpool.execute(move || $block)
164 };
165 }
166
167 macro_rules! reply_executor {
168 ($self:expr) => {
169 $self.reply_threadpool.clone()
170 };
171 }
172
173 macro_rules! execute_reply_task {
174 ($reply_executor:expr, $block:block) => {
175 $reply_executor.execute(move || $block);
176 };
177 }
178
179 pub(crate) use execute_reply_task;
180 pub(crate) use execute_task;
181 pub(crate) use reply_executor;
182}
183
184#[cfg(feature = "async")]
185mod async_task {
186 use super::*;
187
188 use std::sync::Arc;
189 use tokio::runtime::Runtime;
190 use tokio::sync::Mutex;
191
192 pub(crate) struct FuseDriver<TId, THandler>
193 where
194 TId: FileIdType,
195 THandler: FuseHandler<TId>,
196 {
197 handler: Arc<THandler>,
198 resolver: Arc<TId::Resolver>,
199 dirmap_iter: Arc<Mutex<DirIter<FileKind>>>,
200 dirmapplus_iter: Arc<Mutex<DirIter<FileAttribute>>>,
201 pub runtime: Arc<Runtime>,
202 }
203
204 impl<TId, THandler> FuseDriver<TId, THandler>
205 where
206 TId: FileIdType,
207 THandler: FuseHandler<TId>,
208 {
209 pub fn new(handler: THandler, _num_threads: usize) -> FuseDriver<TId, THandler> {
210 #[cfg(feature = "deadlock_detection")]
211 spawn_deadlock_checker();
212 FuseDriver {
213 handler: Arc::new(handler),
214 resolver: Arc::new(TId::create_resolver()),
215 dirmap_iter: Arc::new(Mutex::new(HashMap::new())),
216 dirmapplus_iter: Arc::new(Mutex::new(HashMap::new())),
217 runtime: Arc::new(Runtime::new().unwrap()),
218 }
219 }
220
221 pub fn get_handler(&self) -> Arc<THandler> {
222 self.handler.clone()
223 }
224
225 pub fn get_resolver(&self) -> Arc<TId::Resolver> {
226 self.resolver.clone()
227 }
228
229 pub fn get_dirmap_iter(&self) -> Arc<Mutex<DirIter<FileKind>>> {
230 self.dirmap_iter.clone()
231 }
232
233 pub fn get_dirmapplus_iter(&self) -> Arc<Mutex<DirIter<FileAttribute>>> {
234 self.dirmapplus_iter.clone()
235 }
236 }
237
238 macro_rules! execute_task {
239 ($self:expr, $block:block) => {
240 $self.runtime.spawn(async move { $block })
241 };
242 }
243
244 macro_rules! reply_executor {
245 ($self:expr) => {
246 $self.runtime.clone()
247 };
248 }
249
250 macro_rules! execute_reply_task {
251 ($reply_executor:expr, $block:block) => {
252 $reply_executor.spawn(async move { $block })
253 };
254 }
255
256 pub(crate) use execute_reply_task;
257 pub(crate) use execute_task;
258 pub(crate) use reply_executor;
259}
260
261#[cfg(feature = "deadlock_detection")]
262fn spawn_deadlock_checker() {
263 use log::{error, info};
264 use parking_lot::deadlock;
265 use std::thread;
266 use std::time::Duration;
267
268 thread::spawn(move || {
270 loop {
271 thread::sleep(Duration::from_secs(10));
272 let deadlocks = deadlock::check_deadlock();
273 if deadlocks.is_empty() {
274 info!("# No deadlock");
275 continue;
276 }
277
278 eprintln!("# {} deadlocks detected", deadlocks.len());
279 for (i, threads) in deadlocks.iter().enumerate() {
280 error!("Deadlock #{}", i);
281 for t in threads {
282 error!("Thread Id {:#?}\n, {:#?}", t.thread_id(), t.backtrace());
283 }
284 }
285 }
286 });
287}
288
289#[cfg(feature = "serial")]
290pub use serial::*;
291
292#[cfg(feature = "parallel")]
293pub use parallel::*;
294
295#[cfg(feature = "async")]
296pub use async_task::*;