Skip to main content

microsandbox_agentd/
fs.rs

1//! Guest-side filesystem operation handlers.
2//!
3//! Handles `core.fs.*` protocol messages by performing filesystem operations
4//! using `std::fs` and `tokio::fs`, then sending responses back to the host.
5
6use std::collections::HashMap;
7use std::ffi::CString;
8use std::os::fd::AsRawFd;
9use std::os::unix::ffi::OsStrExt;
10use std::os::unix::fs::{MetadataExt, PermissionsExt};
11use std::path::Path;
12use std::sync::Arc;
13
14use microsandbox_protocol::AGENT_RELAY_ID_RANGE_STEP;
15use microsandbox_protocol::codec;
16use microsandbox_protocol::fs::{
17    FS_CHUNK_SIZE, FsData, FsEntryInfo, FsOp, FsOpenOptions, FsRequest, FsResponse, FsResponseData,
18    FsSetAttrs,
19};
20use microsandbox_protocol::message::{Message, MessageType};
21use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
22use tokio::sync::{Mutex, mpsc};
23use tokio::task::JoinHandle;
24
25use crate::session::SessionOutput;
26
27//--------------------------------------------------------------------------------------------------
28// Constants
29//--------------------------------------------------------------------------------------------------
30
31/// Default maximum number of entries returned by one `ReadDir` request.
32const DEFAULT_READ_DIR_LIMIT: u32 = 128;
33
34/// Maximum number of open filesystem handles owned by one relay client.
35const MAX_OPEN_HANDLES_PER_OWNER: usize = 1024;
36
37//--------------------------------------------------------------------------------------------------
38// Types
39//--------------------------------------------------------------------------------------------------
40
41/// Mutable filesystem protocol state held by agentd.
42#[derive(Default)]
43pub struct FsState {
44    next_handle: u64,
45    handles: HashMap<u64, FsHandleEntry>,
46}
47
48/// Tracks an in-progress streaming write operation.
49pub struct FsWriteSession {
50    owner_id: u32,
51    handle: u64,
52    file: Arc<Mutex<tokio::fs::File>>,
53    offset: u64,
54    append: bool,
55    expected_len: Option<u64>,
56    written: u64,
57}
58
59/// Tracks an in-progress streaming read operation.
60pub struct FsReadSession {
61    owner_id: u32,
62    handle: u64,
63    task: JoinHandle<()>,
64}
65
66/// A filesystem stream session started by a request.
67pub enum FsStreamSession {
68    /// Read stream task.
69    Read(FsReadSession),
70
71    /// Write stream awaiting `FsData` chunks.
72    Write(FsWriteSession),
73}
74
75enum FsHandleEntry {
76    File {
77        owner_id: u32,
78        file: Arc<Mutex<tokio::fs::File>>,
79        read: bool,
80        write: bool,
81        append: bool,
82        path: String,
83    },
84    Dir {
85        owner_id: u32,
86        dir: Arc<Mutex<tokio::fs::ReadDir>>,
87        path: String,
88    },
89}
90
91//--------------------------------------------------------------------------------------------------
92// Methods
93//--------------------------------------------------------------------------------------------------
94
95impl FsState {
96    /// Close handles opened by a disconnected relay client.
97    pub fn close_owner_range(&mut self, id_start: u32, id_end_exclusive: u32) {
98        self.handles.retain(|_, handle| {
99            let owner_id = handle.owner_id();
100            owner_id < id_start || owner_id >= id_end_exclusive
101        });
102    }
103
104    /// Close all handles.
105    pub fn clear(&mut self) {
106        self.handles.clear();
107    }
108
109    fn insert_file(
110        &mut self,
111        owner_id: u32,
112        file: tokio::fs::File,
113        read: bool,
114        write: bool,
115        append: bool,
116        path: String,
117    ) -> Result<u64, String> {
118        self.enforce_owner_limit(owner_id)?;
119        let handle = self.alloc_handle();
120        self.handles.insert(
121            handle,
122            FsHandleEntry::File {
123                owner_id,
124                file: Arc::new(Mutex::new(file)),
125                read,
126                write,
127                append,
128                path,
129            },
130        );
131        Ok(handle)
132    }
133
134    fn insert_dir(
135        &mut self,
136        owner_id: u32,
137        dir: tokio::fs::ReadDir,
138        path: String,
139    ) -> Result<u64, String> {
140        self.enforce_owner_limit(owner_id)?;
141        let handle = self.alloc_handle();
142        self.handles.insert(
143            handle,
144            FsHandleEntry::Dir {
145                owner_id,
146                dir: Arc::new(Mutex::new(dir)),
147                path,
148            },
149        );
150        Ok(handle)
151    }
152
153    fn close_handle(&mut self, caller_id: u32, handle: u64) -> Result<FsHandleEntry, String> {
154        if let Some(entry) = self.handles.get(&handle) {
155            entry.ensure_owner(handle, caller_id)?;
156        }
157        self.handles
158            .remove(&handle)
159            .ok_or_else(|| format!("invalid handle: {handle}"))
160    }
161
162    fn file(
163        &self,
164        caller_id: u32,
165        handle: u64,
166        need_read: bool,
167        need_write: bool,
168    ) -> Result<(Arc<Mutex<tokio::fs::File>>, bool, String), String> {
169        match self.handles.get(&handle) {
170            Some(FsHandleEntry::File {
171                file,
172                read,
173                write,
174                append,
175                path,
176                ..
177            }) => {
178                self.handles
179                    .get(&handle)
180                    .expect("entry just matched")
181                    .ensure_owner(handle, caller_id)?;
182                if need_read && !read {
183                    return Err(format!("handle {handle} is not open for reading"));
184                }
185                if need_write && !write && !append {
186                    return Err(format!("handle {handle} is not open for writing"));
187                }
188                Ok((Arc::clone(file), *append, path.clone()))
189            }
190            Some(FsHandleEntry::Dir { .. }) => Err(format!("handle {handle} is a directory")),
191            None => Err(format!("invalid handle: {handle}")),
192        }
193    }
194
195    fn dir(
196        &self,
197        caller_id: u32,
198        handle: u64,
199    ) -> Result<(Arc<Mutex<tokio::fs::ReadDir>>, String), String> {
200        match self.handles.get(&handle) {
201            Some(FsHandleEntry::Dir { dir, path, .. }) => {
202                self.handles
203                    .get(&handle)
204                    .expect("entry just matched")
205                    .ensure_owner(handle, caller_id)?;
206                Ok((Arc::clone(dir), path.clone()))
207            }
208            Some(FsHandleEntry::File { .. }) => Err(format!("handle {handle} is a file")),
209            None => Err(format!("invalid handle: {handle}")),
210        }
211    }
212
213    fn alloc_handle(&mut self) -> u64 {
214        self.next_handle = self.next_handle.wrapping_add(1).max(1);
215        while self.handles.contains_key(&self.next_handle) {
216            self.next_handle = self.next_handle.wrapping_add(1).max(1);
217        }
218        self.next_handle
219    }
220
221    fn enforce_owner_limit(&self, owner_id: u32) -> Result<(), String> {
222        let count = self
223            .handles
224            .values()
225            .filter(|entry| same_relay_client(entry.owner_id(), owner_id))
226            .count();
227        if count >= MAX_OPEN_HANDLES_PER_OWNER {
228            return Err(format!(
229                "too many open filesystem handles for relay client: {count}"
230            ));
231        }
232        Ok(())
233    }
234}
235
236impl FsHandleEntry {
237    fn owner_id(&self) -> u32 {
238        match self {
239            Self::File { owner_id, .. } | Self::Dir { owner_id, .. } => *owner_id,
240        }
241    }
242
243    fn ensure_owner(&self, handle: u64, caller_id: u32) -> Result<(), String> {
244        if same_relay_client(self.owner_id(), caller_id) {
245            Ok(())
246        } else {
247            Err(format!(
248                "handle {handle} is owned by a different relay client"
249            ))
250        }
251    }
252}
253
254impl FsReadSession {
255    /// Correlation ID whose relay client owns this read stream.
256    pub fn owner_id(&self) -> u32 {
257        self.owner_id
258    }
259
260    /// Filesystem handle being read.
261    pub fn handle(&self) -> u64 {
262        self.handle
263    }
264
265    /// Abort the background read task.
266    pub fn abort(self) {
267        self.task.abort();
268    }
269}
270
271impl FsWriteSession {
272    /// Correlation ID whose relay client owns this write stream.
273    pub fn owner_id(&self) -> u32 {
274        self.owner_id
275    }
276
277    /// Filesystem handle being written.
278    pub fn handle(&self) -> u64 {
279        self.handle
280    }
281}
282
283//--------------------------------------------------------------------------------------------------
284// Functions
285//--------------------------------------------------------------------------------------------------
286
287fn relay_client_slot(id: u32) -> Option<u32> {
288    if id == 0 {
289        None
290    } else {
291        Some((id - 1) / AGENT_RELAY_ID_RANGE_STEP)
292    }
293}
294
295fn same_relay_client(left: u32, right: u32) -> bool {
296    relay_client_slot(left).is_some_and(|left| Some(left) == relay_client_slot(right))
297}
298
299/// Handles an incoming `FsRequest` message.
300pub async fn handle_fs_request(
301    id: u32,
302    req: FsRequest,
303    state: &mut FsState,
304    out_buf: &mut Vec<u8>,
305    session_tx: &mpsc::UnboundedSender<(u32, SessionOutput)>,
306) -> Result<Option<FsStreamSession>, String> {
307    match req.op {
308        FsOp::RealPath { path } => {
309            let resp = handle_realpath(&path).await;
310            encode_response(id, resp, out_buf)?;
311            Ok(None)
312        }
313        FsOp::Stat {
314            path,
315            follow_symlink,
316        } => {
317            let resp = handle_stat(&path, follow_symlink).await;
318            encode_response(id, resp, out_buf)?;
319            Ok(None)
320        }
321        FsOp::SetStat {
322            path,
323            follow_symlink,
324            attrs,
325        } => {
326            let resp = handle_setstat(&path, follow_symlink, attrs).await;
327            encode_response(id, resp, out_buf)?;
328            Ok(None)
329        }
330        FsOp::List { path } => {
331            let resp = handle_list(&path).await;
332            encode_response(id, resp, out_buf)?;
333            Ok(None)
334        }
335        FsOp::ReadLink { path } => {
336            let resp = handle_readlink(&path).await;
337            encode_response(id, resp, out_buf)?;
338            Ok(None)
339        }
340        FsOp::Symlink { target, link_path } => {
341            let resp = handle_symlink(&target, &link_path).await;
342            encode_response(id, resp, out_buf)?;
343            Ok(None)
344        }
345        FsOp::Mkdir { path, mode } => {
346            let resp = handle_mkdir(&path, mode).await;
347            encode_response(id, resp, out_buf)?;
348            Ok(None)
349        }
350        FsOp::Remove { path } => {
351            let resp = handle_remove(&path).await;
352            encode_response(id, resp, out_buf)?;
353            Ok(None)
354        }
355        FsOp::RemoveDir { path, recursive } => {
356            let resp = handle_remove_dir(&path, recursive).await;
357            encode_response(id, resp, out_buf)?;
358            Ok(None)
359        }
360        FsOp::Copy { src, dst } => {
361            let resp = handle_copy(&src, &dst).await;
362            encode_response(id, resp, out_buf)?;
363            Ok(None)
364        }
365        FsOp::Rename { src, dst } => {
366            let resp = handle_rename(&src, &dst).await;
367            encode_response(id, resp, out_buf)?;
368            Ok(None)
369        }
370        FsOp::OpenFile { path, options } => {
371            let resp = handle_open_file(id, state, &path, options).await;
372            encode_response(id, resp, out_buf)?;
373            Ok(None)
374        }
375        FsOp::OpenDir { path } => {
376            let resp = handle_open_dir(id, state, &path).await;
377            encode_response(id, resp, out_buf)?;
378            Ok(None)
379        }
380        FsOp::CloseHandle { handle } => {
381            let resp = handle_close_handle(id, state, handle).await;
382            encode_response(id, resp, out_buf)?;
383            Ok(None)
384        }
385        FsOp::Read {
386            handle,
387            offset,
388            len,
389        } => match state.file(id, handle, true, false) {
390            Ok((file, _, _)) => {
391                let tx = session_tx.clone();
392                let task = tokio::spawn(async move {
393                    handle_read_stream(id, file, offset, len, &tx).await;
394                });
395                Ok(Some(FsStreamSession::Read(FsReadSession {
396                    owner_id: id,
397                    handle,
398                    task,
399                })))
400            }
401            Err(e) => {
402                encode_response(id, error_response(format!("read: {e}")), out_buf)?;
403                Ok(None)
404            }
405        },
406        FsOp::Write {
407            handle,
408            offset,
409            len,
410        } => match state.file(id, handle, false, true) {
411            Ok((file, append, _)) => Ok(Some(FsStreamSession::Write(FsWriteSession {
412                owner_id: id,
413                handle,
414                file,
415                offset,
416                append,
417                expected_len: len,
418                written: 0,
419            }))),
420            Err(e) => {
421                encode_response(id, error_response(format!("write: {e}")), out_buf)?;
422                Ok(None)
423            }
424        },
425        FsOp::ReadDir { handle, limit } => {
426            let resp = handle_read_dir(id, state, handle, limit).await;
427            encode_response(id, resp, out_buf)?;
428            Ok(None)
429        }
430        FsOp::FStat { handle } => {
431            let resp = handle_fstat(id, state, handle).await;
432            encode_response(id, resp, out_buf)?;
433            Ok(None)
434        }
435        FsOp::FSetStat { handle, attrs } => {
436            let resp = handle_fsetstat(id, state, handle, attrs).await;
437            encode_response(id, resp, out_buf)?;
438            Ok(None)
439        }
440    }
441}
442
443/// Handles an incoming `FsData` message for a streaming write session.
444///
445/// If `data` is empty, the file is flushed and a terminal `FsResponse` is sent.
446/// Returns `true` if the session should be removed (EOF received).
447pub async fn handle_fs_data(
448    id: u32,
449    data: FsData,
450    session: &mut FsWriteSession,
451    out_buf: &mut Vec<u8>,
452) -> Result<bool, String> {
453    if data.data.is_empty() {
454        if let Some(expected) = session.expected_len
455            && session.written != expected
456        {
457            let resp = error_response(format!(
458                "write length mismatch: expected {expected}, wrote {}",
459                session.written
460            ));
461            encode_response(id, resp, out_buf)?;
462            return Ok(true);
463        }
464
465        if let Some(expected) = session.expected_len {
466            let next_written = session.written.saturating_add(data.data.len() as u64);
467            if next_written > expected {
468                let resp = error_response(format!(
469                    "write length mismatch: expected {expected}, received at least {next_written}"
470                ));
471                encode_response(id, resp, out_buf)?;
472                return Ok(true);
473            }
474        }
475
476        let mut file = session.file.lock().await;
477        if let Err(e) = file.flush().await {
478            encode_response(id, error_response(format!("flush: {e}")), out_buf)?;
479            return Ok(true);
480        }
481
482        encode_response(id, ok_response(None), out_buf)?;
483        Ok(true)
484    } else {
485        let mut file = session.file.lock().await;
486        if !session.append
487            && let Err(e) = file.seek(std::io::SeekFrom::Start(session.offset)).await
488        {
489            encode_response(id, error_response(format!("seek: {e}")), out_buf)?;
490            return Ok(true);
491        }
492        if let Err(e) = file.write_all(&data.data).await {
493            encode_response(id, error_response(format!("write: {e}")), out_buf)?;
494            return Ok(true);
495        }
496        session.offset = session.offset.saturating_add(data.data.len() as u64);
497        session.written = session.written.saturating_add(data.data.len() as u64);
498        Ok(false)
499    }
500}
501
502//--------------------------------------------------------------------------------------------------
503// Functions: Handlers
504//--------------------------------------------------------------------------------------------------
505
506async fn handle_realpath(path: &str) -> FsResponse {
507    match realpath(path).await {
508        Ok(path) => ok_response(Some(FsResponseData::Path(path))),
509        Err(e) => error_response(format!("realpath: {e}")),
510    }
511}
512
513async fn handle_stat(path: &str, follow_symlink: bool) -> FsResponse {
514    let result = if follow_symlink {
515        tokio::fs::metadata(path).await
516    } else {
517        tokio::fs::symlink_metadata(path).await
518    };
519
520    match result {
521        Ok(meta) => ok_response(Some(FsResponseData::Stat(metadata_to_entry_info(
522            path, &meta,
523        )))),
524        Err(e) => error_response(format!("stat: {e}")),
525    }
526}
527
528async fn handle_setstat(path: &str, follow_symlink: bool, attrs: FsSetAttrs) -> FsResponse {
529    match apply_path_attrs(path, follow_symlink, attrs).await {
530        Ok(()) => ok_response(None),
531        Err(e) => error_response(format!("setstat: {e}")),
532    }
533}
534
535async fn handle_list(path: &str) -> FsResponse {
536    match read_all_dir(path).await {
537        Ok(entries) => ok_response(Some(FsResponseData::List(entries))),
538        Err(e) => error_response(format!("readdir: {e}")),
539    }
540}
541
542async fn handle_readlink(path: &str) -> FsResponse {
543    match tokio::fs::read_link(path).await {
544        Ok(target) => ok_response(Some(FsResponseData::Path(
545            target.to_string_lossy().to_string(),
546        ))),
547        Err(e) => error_response(format!("readlink: {e}")),
548    }
549}
550
551async fn handle_symlink(target: &str, link_path: &str) -> FsResponse {
552    let target = target.to_string();
553    let link_path = link_path.to_string();
554    match tokio::task::spawn_blocking(move || std::os::unix::fs::symlink(target, link_path)).await {
555        Ok(Ok(())) => ok_response(None),
556        Ok(Err(e)) => error_response(format!("symlink: {e}")),
557        Err(e) => error_response(format!("symlink task: {e}")),
558    }
559}
560
561async fn handle_open_file(
562    id: u32,
563    state: &mut FsState,
564    path: &str,
565    options: FsOpenOptions,
566) -> FsResponse {
567    let mut open_options = tokio::fs::OpenOptions::new();
568    open_options
569        .read(options.read)
570        .write(options.write)
571        .append(options.append)
572        .create(options.create)
573        .truncate(options.truncate)
574        .create_new(options.create_new);
575    if let Some(mode) = options.mode {
576        open_options.mode(mode);
577    }
578
579    match open_options.open(path).await {
580        Ok(file) => match state.insert_file(
581            id,
582            file,
583            options.read,
584            options.write,
585            options.append,
586            path.to_string(),
587        ) {
588            Ok(handle) => ok_response(Some(FsResponseData::Handle(handle))),
589            Err(e) => error_response(format!("open: {e}")),
590        },
591        Err(e) => error_response(format!("open: {e}")),
592    }
593}
594
595async fn handle_open_dir(id: u32, state: &mut FsState, path: &str) -> FsResponse {
596    match tokio::fs::read_dir(path).await {
597        Ok(dir) => match state.insert_dir(id, dir, path.to_string()) {
598            Ok(handle) => ok_response(Some(FsResponseData::Handle(handle))),
599            Err(e) => error_response(format!("opendir: {e}")),
600        },
601        Err(e) => error_response(format!("opendir: {e}")),
602    }
603}
604
605async fn handle_close_handle(id: u32, state: &mut FsState, handle: u64) -> FsResponse {
606    match state.close_handle(id, handle) {
607        Ok(FsHandleEntry::File { file, .. }) => {
608            let mut file = file.lock().await;
609            match file.flush().await {
610                Ok(()) => ok_response(None),
611                Err(e) => error_response(format!("close: {e}")),
612            }
613        }
614        Ok(FsHandleEntry::Dir { .. }) => ok_response(None),
615        Err(e) => error_response(format!("close: {e}")),
616    }
617}
618
619async fn handle_read_dir(id: u32, state: &FsState, handle: u64, limit: Option<u32>) -> FsResponse {
620    let (dir, path) = match state.dir(id, handle) {
621        Ok(v) => v,
622        Err(e) => return error_response(format!("readdir: {e}")),
623    };
624
625    let limit = limit.unwrap_or(DEFAULT_READ_DIR_LIMIT).max(1);
626    let mut dir = dir.lock().await;
627    let mut entries = Vec::new();
628
629    for _ in 0..limit {
630        match dir.next_entry().await {
631            Ok(Some(entry)) => {
632                let entry_path = entry.path();
633                let path_str = entry_path.to_string_lossy().to_string();
634                match tokio::fs::symlink_metadata(&entry_path).await {
635                    Ok(meta) => entries.push(metadata_to_entry_info(&path_str, &meta)),
636                    Err(_) => entries.push(unknown_entry_info(&path_str)),
637                }
638            }
639            Ok(None) => break,
640            Err(e) => return error_response(format!("readdir {path}: {e}")),
641        }
642    }
643
644    ok_response(Some(FsResponseData::List(entries)))
645}
646
647async fn handle_fstat(id: u32, state: &FsState, handle: u64) -> FsResponse {
648    match state.handles.get(&handle) {
649        Some(FsHandleEntry::File { file, path, .. }) => {
650            if let Err(e) = state
651                .handles
652                .get(&handle)
653                .expect("entry just matched")
654                .ensure_owner(handle, id)
655            {
656                return error_response(format!("fstat: {e}"));
657            }
658            let file = file.lock().await;
659            match file.metadata().await {
660                Ok(meta) => ok_response(Some(FsResponseData::Stat(metadata_to_entry_info(
661                    path, &meta,
662                )))),
663                Err(e) => error_response(format!("fstat: {e}")),
664            }
665        }
666        Some(FsHandleEntry::Dir { path, .. }) => {
667            if let Err(e) = state
668                .handles
669                .get(&handle)
670                .expect("entry just matched")
671                .ensure_owner(handle, id)
672            {
673                return error_response(format!("fstat: {e}"));
674            }
675            match tokio::fs::metadata(path).await {
676                Ok(meta) => ok_response(Some(FsResponseData::Stat(metadata_to_entry_info(
677                    path, &meta,
678                )))),
679                Err(e) => error_response(format!("fstat: {e}")),
680            }
681        }
682        None => error_response(format!("fstat: invalid handle: {handle}")),
683    }
684}
685
686async fn handle_fsetstat(id: u32, state: &FsState, handle: u64, attrs: FsSetAttrs) -> FsResponse {
687    let (file, _, path) = match state.file(id, handle, false, false) {
688        Ok(v) => v,
689        Err(e) => return error_response(format!("fsetstat: {e}")),
690    };
691
692    let mut file = file.lock().await;
693    match apply_file_attrs(&mut file, &path, attrs).await {
694        Ok(()) => ok_response(None),
695        Err(e) => error_response(format!("fsetstat: {e}")),
696    }
697}
698
699async fn handle_mkdir(path: &str, mode: Option<u32>) -> FsResponse {
700    match tokio::fs::create_dir_all(path).await {
701        Ok(()) => {
702            if let Some(mode) = mode
703                && let Err(e) =
704                    tokio::fs::set_permissions(path, std::fs::Permissions::from_mode(mode)).await
705            {
706                return error_response(format!("chmod: {e}"));
707            }
708            ok_response(None)
709        }
710        Err(e) => error_response(format!("mkdir: {e}")),
711    }
712}
713
714async fn handle_remove(path: &str) -> FsResponse {
715    match tokio::fs::remove_file(path).await {
716        Ok(()) => ok_response(None),
717        Err(e) => error_response(format!("remove: {e}")),
718    }
719}
720
721async fn handle_remove_dir(path: &str, recursive: bool) -> FsResponse {
722    let result = if recursive {
723        tokio::fs::remove_dir_all(path).await
724    } else {
725        tokio::fs::remove_dir(path).await
726    };
727    match result {
728        Ok(()) => ok_response(None),
729        Err(e) => error_response(format!("remove_dir: {e}")),
730    }
731}
732
733async fn handle_copy(src: &str, dst: &str) -> FsResponse {
734    match tokio::fs::copy(src, dst).await {
735        Ok(_) => ok_response(None),
736        Err(e) => error_response(format!("copy: {e}")),
737    }
738}
739
740async fn handle_rename(src: &str, dst: &str) -> FsResponse {
741    match tokio::fs::rename(src, dst).await {
742        Ok(()) => ok_response(None),
743        Err(e) => error_response(format!("rename: {e}")),
744    }
745}
746
747async fn handle_read_stream(
748    id: u32,
749    file: Arc<Mutex<tokio::fs::File>>,
750    offset: u64,
751    len: Option<u64>,
752    tx: &mpsc::UnboundedSender<(u32, SessionOutput)>,
753) {
754    let mut file = file.lock().await;
755    if let Err(e) = file.seek(std::io::SeekFrom::Start(offset)).await {
756        send_raw_response(id, false, Some(format!("seek: {e}")), None, tx);
757        return;
758    }
759
760    let mut remaining = len;
761    let mut chunk = vec![0u8; FS_CHUNK_SIZE];
762    let mut buf = Vec::new();
763
764    loop {
765        let read_len = match remaining {
766            Some(0) => break,
767            Some(n) => chunk.len().min(n as usize),
768            None => chunk.len(),
769        };
770
771        match file.read(&mut chunk[..read_len]).await {
772            Ok(0) => break,
773            Ok(n) => {
774                if let Some(ref mut remaining) = remaining {
775                    *remaining = remaining.saturating_sub(n as u64);
776                }
777                let data = FsData {
778                    data: chunk[..n].to_vec(),
779                };
780                let msg = match Message::with_payload(MessageType::FsData, id, &data) {
781                    Ok(msg) => msg,
782                    Err(e) => {
783                        send_raw_response(id, false, Some(format!("encode chunk: {e}")), None, tx);
784                        return;
785                    }
786                };
787                buf.clear();
788                if let Err(e) = codec::encode_to_buf(&msg, &mut buf) {
789                    send_raw_response(
790                        id,
791                        false,
792                        Some(format!("encode chunk frame: {e}")),
793                        None,
794                        tx,
795                    );
796                    return;
797                }
798                if tx.send((id, SessionOutput::Raw(buf.clone()))).is_err() {
799                    return;
800                }
801            }
802            Err(e) => {
803                send_raw_response(id, false, Some(format!("read: {e}")), None, tx);
804                return;
805            }
806        }
807    }
808
809    send_raw_response(id, true, None, None, tx);
810}
811
812//--------------------------------------------------------------------------------------------------
813// Functions: Attribute Helpers
814//--------------------------------------------------------------------------------------------------
815
816async fn apply_path_attrs(
817    path: &str,
818    follow_symlink: bool,
819    attrs: FsSetAttrs,
820) -> Result<(), String> {
821    if let Some(size) = attrs.size {
822        let file = tokio::fs::OpenOptions::new()
823            .write(true)
824            .open(path)
825            .await
826            .map_err(|e| format!("open for truncate: {e}"))?;
827        file.set_len(size)
828            .await
829            .map_err(|e| format!("set_len: {e}"))?;
830    }
831
832    if let Some(mode) = attrs.mode {
833        if !follow_symlink
834            && tokio::fs::symlink_metadata(path)
835                .await
836                .map_err(|e| format!("lstat before chmod: {e}"))?
837                .file_type()
838                .is_symlink()
839        {
840            return Err("chmod on symlink without following is not supported".into());
841        }
842        tokio::fs::set_permissions(path, std::fs::Permissions::from_mode(mode))
843            .await
844            .map_err(|e| format!("chmod: {e}"))?;
845    }
846
847    if attrs.uid.is_some() || attrs.gid.is_some() {
848        chown_path(path, follow_symlink, attrs.uid, attrs.gid)?;
849    }
850
851    if attrs.atime.is_some() || attrs.mtime.is_some() {
852        set_times_path(path, follow_symlink, attrs.atime, attrs.mtime).await?;
853    }
854
855    Ok(())
856}
857
858async fn apply_file_attrs(
859    file: &mut tokio::fs::File,
860    path: &str,
861    attrs: FsSetAttrs,
862) -> Result<(), String> {
863    if let Some(size) = attrs.size {
864        file.set_len(size)
865            .await
866            .map_err(|e| format!("set_len: {e}"))?;
867    }
868
869    if let Some(mode) = attrs.mode {
870        file.set_permissions(std::fs::Permissions::from_mode(mode))
871            .await
872            .map_err(|e| format!("chmod: {e}"))?;
873    }
874
875    if attrs.uid.is_some() || attrs.gid.is_some() {
876        let uid = attrs.uid.map(|v| v as libc::uid_t).unwrap_or(!0);
877        let gid = attrs.gid.map(|v| v as libc::gid_t).unwrap_or(!0);
878        let rc = unsafe { libc::fchown(file.as_raw_fd(), uid, gid) };
879        if rc != 0 {
880            return Err(format!("fchown: {}", std::io::Error::last_os_error()));
881        }
882    }
883
884    if attrs.atime.is_some() || attrs.mtime.is_some() {
885        set_times_fd(file.as_raw_fd(), path, attrs.atime, attrs.mtime).await?;
886    }
887
888    Ok(())
889}
890
891fn chown_path(
892    path: &str,
893    follow_symlink: bool,
894    uid: Option<u32>,
895    gid: Option<u32>,
896) -> Result<(), String> {
897    let c_path = cstring_path(path)?;
898    let uid = uid.map(|v| v as libc::uid_t).unwrap_or(!0);
899    let gid = gid.map(|v| v as libc::gid_t).unwrap_or(!0);
900    let rc = unsafe {
901        if follow_symlink {
902            libc::chown(c_path.as_ptr(), uid, gid)
903        } else {
904            libc::lchown(c_path.as_ptr(), uid, gid)
905        }
906    };
907    if rc != 0 {
908        return Err(format!("chown: {}", std::io::Error::last_os_error()));
909    }
910    Ok(())
911}
912
913async fn set_times_path(
914    path: &str,
915    follow_symlink: bool,
916    atime: Option<i64>,
917    mtime: Option<i64>,
918) -> Result<(), String> {
919    let meta = if follow_symlink {
920        tokio::fs::metadata(path).await
921    } else {
922        tokio::fs::symlink_metadata(path).await
923    }
924    .map_err(|e| format!("stat before utimensat: {e}"))?;
925    let times = timespecs(atime.unwrap_or(meta.atime()), mtime.unwrap_or(meta.mtime()));
926    let c_path = cstring_path(path)?;
927    let flags = if follow_symlink {
928        0
929    } else {
930        libc::AT_SYMLINK_NOFOLLOW
931    };
932    let rc = unsafe { libc::utimensat(libc::AT_FDCWD, c_path.as_ptr(), times.as_ptr(), flags) };
933    if rc != 0 {
934        return Err(format!("utimensat: {}", std::io::Error::last_os_error()));
935    }
936    Ok(())
937}
938
939async fn set_times_fd(
940    fd: std::os::fd::RawFd,
941    path: &str,
942    atime: Option<i64>,
943    mtime: Option<i64>,
944) -> Result<(), String> {
945    let meta = tokio::fs::metadata(path)
946        .await
947        .map_err(|e| format!("stat before futimens: {e}"))?;
948    let times = timespecs(atime.unwrap_or(meta.atime()), mtime.unwrap_or(meta.mtime()));
949    let rc = unsafe { libc::futimens(fd, times.as_ptr()) };
950    if rc != 0 {
951        return Err(format!("futimens: {}", std::io::Error::last_os_error()));
952    }
953    Ok(())
954}
955
956fn timespecs(atime: i64, mtime: i64) -> [libc::timespec; 2] {
957    [
958        libc::timespec {
959            tv_sec: atime as _,
960            tv_nsec: 0,
961        },
962        libc::timespec {
963            tv_sec: mtime as _,
964            tv_nsec: 0,
965        },
966    ]
967}
968
969//--------------------------------------------------------------------------------------------------
970// Functions: Helpers
971//--------------------------------------------------------------------------------------------------
972
973fn encode_response(id: u32, resp: FsResponse, out_buf: &mut Vec<u8>) -> Result<(), String> {
974    let msg = Message::with_payload(MessageType::FsResponse, id, &resp)
975        .map_err(|e| format!("encode fs response: {e}"))?;
976    codec::encode_to_buf(&msg, out_buf).map_err(|e| format!("encode fs response frame: {e}"))?;
977    Ok(())
978}
979
980fn send_raw_response(
981    id: u32,
982    ok: bool,
983    error: Option<String>,
984    data: Option<FsResponseData>,
985    tx: &mpsc::UnboundedSender<(u32, SessionOutput)>,
986) {
987    let resp = FsResponse { ok, error, data };
988    match Message::with_payload(MessageType::FsResponse, id, &resp) {
989        Ok(msg) => {
990            let mut buf = Vec::new();
991            match codec::encode_to_buf(&msg, &mut buf) {
992                Ok(()) => {
993                    let _ = tx.send((id, SessionOutput::Raw(buf)));
994                }
995                Err(e) => {
996                    eprintln!("failed to encode fs response frame for {id}: {e}");
997                }
998            }
999        }
1000        Err(e) => {
1001            eprintln!("failed to encode fs response for {id}: {e}");
1002        }
1003    }
1004}
1005
1006async fn realpath(path: &str) -> Result<String, String> {
1007    match tokio::fs::canonicalize(path).await {
1008        Ok(path) => Ok(path.to_string_lossy().to_string()),
1009        Err(original_error) => {
1010            let path = Path::new(path);
1011            let Some(parent) = path.parent() else {
1012                return Err(original_error.to_string());
1013            };
1014            let parent = tokio::fs::canonicalize(parent)
1015                .await
1016                .map_err(|_| original_error.to_string())?;
1017            let resolved = match path.file_name() {
1018                Some(name) => parent.join(name),
1019                None => parent,
1020            };
1021            Ok(resolved.to_string_lossy().to_string())
1022        }
1023    }
1024}
1025
1026async fn read_all_dir(path: &str) -> Result<Vec<FsEntryInfo>, String> {
1027    let mut dir = tokio::fs::read_dir(path)
1028        .await
1029        .map_err(|e| format!("opendir: {e}"))?;
1030    let mut entries = Vec::new();
1031
1032    loop {
1033        match dir.next_entry().await {
1034            Ok(Some(entry)) => {
1035                let entry_path = entry.path();
1036                let path_str = entry_path.to_string_lossy().to_string();
1037                match tokio::fs::symlink_metadata(&entry_path).await {
1038                    Ok(meta) => entries.push(metadata_to_entry_info(&path_str, &meta)),
1039                    Err(_) => entries.push(unknown_entry_info(&path_str)),
1040                }
1041            }
1042            Ok(None) => break,
1043            Err(e) => return Err(e.to_string()),
1044        }
1045    }
1046
1047    Ok(entries)
1048}
1049
1050fn ok_response(data: Option<FsResponseData>) -> FsResponse {
1051    FsResponse {
1052        ok: true,
1053        error: None,
1054        data,
1055    }
1056}
1057
1058fn error_response(error: String) -> FsResponse {
1059    FsResponse {
1060        ok: false,
1061        error: Some(error),
1062        data: None,
1063    }
1064}
1065
1066fn metadata_to_entry_info(path: &str, meta: &std::fs::Metadata) -> FsEntryInfo {
1067    let kind = if meta.is_file() {
1068        "file"
1069    } else if meta.is_dir() {
1070        "dir"
1071    } else if meta.is_symlink() {
1072        "symlink"
1073    } else {
1074        "other"
1075    };
1076
1077    let mtime = Some(meta.mtime());
1078    let atime = Some(meta.atime());
1079
1080    FsEntryInfo {
1081        path: path.to_string(),
1082        kind: kind.to_string(),
1083        size: meta.len(),
1084        mode: meta.mode(),
1085        modified: mtime,
1086        uid: meta.uid(),
1087        gid: meta.gid(),
1088        atime,
1089        mtime,
1090    }
1091}
1092
1093fn unknown_entry_info(path: &str) -> FsEntryInfo {
1094    FsEntryInfo {
1095        path: path.to_string(),
1096        kind: "other".to_string(),
1097        size: 0,
1098        mode: 0,
1099        modified: None,
1100        uid: 0,
1101        gid: 0,
1102        atime: None,
1103        mtime: None,
1104    }
1105}
1106
1107fn cstring_path(path: impl AsRef<Path>) -> Result<CString, String> {
1108    CString::new(path.as_ref().as_os_str().as_bytes())
1109        .map_err(|e| format!("path contains NUL: {e}"))
1110}