porttyd 0.2.0

xdg-desktop-portal backend daemon for TTY sessions
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
//! Daemon control socket and FIFO for CLI communication
//!
//! Listens on /tmp/portty/<uid>/daemon.sock for CLI requests.
//! Listens on /tmp/portty/<uid>/daemon.ctl for fire-and-forget commands.
//! Owns the session registry. Data operations (edit, clear) are file-based (CLI handles directly).
//! This socket handles control commands: submit, cancel, verify, reset, list.

use std::collections::HashMap;
use std::fs;
use std::io::{BufRead, BufReader};
use std::os::unix::fs::PermissionsExt;
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use std::thread;

use libportty::codec::{read_request, write_response};
use libportty::{Request, Response, SessionInfo};
use libportty::{files, paths};
use tracing::{debug, info, warn};

use crate::portal;
use crate::session::{Session, SessionControl, drain_pending_to};

/// Registry of active portal sessions
#[derive(Debug, Default)]
pub struct SessionRegistry {
    sessions: HashMap<String, RegisteredSession>,
}

/// Metadata about a registered session
pub struct RegisteredSession {
    pub id: String,
    pub portal: String,
    pub operation: String,
    pub title: Option<String>,
    pub created: u64,
    pub dir: PathBuf,
    pub control: Arc<SessionControl>,
    pub initial_entries: Vec<String>,
}

impl std::fmt::Debug for RegisteredSession {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("RegisteredSession")
            .field("id", &self.id)
            .field("portal", &self.portal)
            .field("operation", &self.operation)
            .field("title", &self.title)
            .field("created", &self.created)
            .field("dir", &self.dir)
            .finish()
    }
}

impl From<&RegisteredSession> for SessionInfo {
    fn from(s: &RegisteredSession) -> Self {
        SessionInfo {
            id: s.id.clone(),
            portal: s.portal.clone(),
            operation: s.operation.clone(),
            title: s.title.clone(),
            created: s.created,
            dir: s.dir.to_string_lossy().into_owned(),
        }
    }
}

impl SessionRegistry {
    /// Create a new session and register it atomically.
    ///
    /// Returns the session (for spawning + running) and the session ID.
    pub fn create_session(
        &mut self,
        portal: &str,
        operation: &str,
        options: &serde_json::Value,
        initial_entries: &[String],
        custom_bins: &HashMap<String, String>,
        title: Option<&str>,
    ) -> std::io::Result<Session> {
        let (sender, receiver) = std::sync::mpsc::channel();
        let control = SessionControl::new(sender.clone());

        let session = Session::new(
            portal,
            operation,
            options,
            initial_entries,
            custom_bins,
            sender,
            receiver,
        )?;

        self.register(RegisteredSession {
            id: session.id().to_string(),
            portal: portal.to_string(),
            operation: operation.to_string(),
            title: title.map(String::from),
            created: session.created(),
            dir: session.dir().to_path_buf(),
            control: Arc::new(control),
            initial_entries: initial_entries.to_vec(),
        });

        Ok(session)
    }

    fn register(&mut self, session: RegisteredSession) {
        info!(id = %session.id, portal = %session.portal, operation = %session.operation, "Registering session");
        self.sessions.insert(session.id.clone(), session);
    }

    pub fn unregister(&mut self, id: &str) {
        info!(id, "Unregistering session");
        self.sessions.remove(id);
    }

    pub fn get(&self, id: &str) -> Option<&RegisteredSession> {
        self.sessions.get(id)
    }

    pub fn iter(&self) -> impl Iterator<Item = &RegisteredSession> {
        self.sessions.values()
    }
}

/// Shared daemon state
#[derive(Default)]
pub struct DaemonState {
    pub sessions: SessionRegistry,
}

impl DaemonState {
    pub fn new() -> Self {
        Self::default()
    }
}

/// Daemon control socket server
pub struct DaemonSocket {
    state: Arc<RwLock<DaemonState>>,
    listener: UnixListener,
}

impl DaemonSocket {
    pub fn new(state: Arc<RwLock<DaemonState>>) -> std::io::Result<Self> {
        paths::ensure_base_dir()?;

        let sock_path = paths::daemon_socket_path();
        let _ = fs::remove_file(&sock_path);

        let listener = UnixListener::bind(&sock_path)?;
        info!(?sock_path, "Daemon socket listening");

        Ok(Self { state, listener })
    }

    pub fn spawn(self) -> thread::JoinHandle<()> {
        thread::spawn(move || {
            for stream in self.listener.incoming() {
                match stream {
                    Ok(stream) => {
                        let state = Arc::clone(&self.state);
                        thread::spawn(move || {
                            if let Err(e) = handle_connection(stream, state) {
                                warn!("Connection error: {e}");
                            }
                        });
                    }
                    Err(e) => {
                        warn!("Accept error: {e}");
                    }
                }
            }
        })
    }
}

/// Daemon control FIFO for fire-and-forget commands
pub struct DaemonCtl {
    state: Arc<RwLock<DaemonState>>,
}

impl DaemonCtl {
    pub fn new(state: Arc<RwLock<DaemonState>>) -> std::io::Result<Self> {
        paths::ensure_base_dir()?;

        let ctl_path = paths::daemon_ctl_path();
        let _ = fs::remove_file(&ctl_path);

        std::os::unix::fs::mkfifo(&ctl_path, std::fs::Permissions::from_mode(0o600))?;

        info!(?ctl_path, "Daemon FIFO created");

        Ok(Self { state })
    }

    pub fn spawn(self) -> thread::JoinHandle<()> {
        thread::spawn(move || {
            let ctl_path = paths::daemon_ctl_path();

            // Open with read+write to prevent EOF when all writers close
            let file = match fs::OpenOptions::new()
                .read(true)
                .write(true)
                .open(&ctl_path)
            {
                Ok(f) => f,
                Err(e) => {
                    warn!("Failed to open FIFO: {e}");
                    return;
                }
            };
            let reader = BufReader::new(file);

            info!("Daemon FIFO listening");

            for line in reader.lines() {
                match line {
                    Ok(line) => {
                        let line = line.trim().to_string();
                        if line.is_empty() {
                            continue;
                        }
                        match Request::decode(&line) {
                            Ok(req) => {
                                debug!(?req, "FIFO request");
                                let resp = handle_request(req, &self.state);
                                debug!(?resp, "FIFO response (discarded)");
                            }
                            Err(e) => {
                                warn!("FIFO parse error: {e}");
                            }
                        }
                    }
                    Err(e) => {
                        warn!("FIFO read error: {e}");
                        break;
                    }
                }
            }
        })
    }
}

fn handle_connection(
    stream: UnixStream,
    state: Arc<RwLock<DaemonState>>,
) -> Result<(), libportty::codec::IpcError> {
    let mut reader = BufReader::new(&stream);
    let mut writer = &stream;

    let req = read_request(&mut reader)?;
    debug!(?req, "Received daemon request");

    let resp = handle_request(req, &state);
    write_response(&mut writer, &resp)?;

    Ok(())
}

fn handle_request(req: Request, state: &Arc<RwLock<DaemonState>>) -> Response {
    match req {
        Request::Submit { session_id } => handle_submit(session_id, state),
        Request::Cancel { session_id } => handle_cancel(session_id, state),
        Request::Verify { session_id } => handle_verify(session_id, state),
        Request::Reset { session_id } => handle_reset(session_id, state),
        Request::List => handle_list(state),
    }
}

/// Submit: resolve session (by id or earliest), drain pending, signal submitted.
/// No session -> queue to submissions dir.
fn handle_submit(session_id: Option<String>, state: &Arc<RwLock<DaemonState>>) -> Response {
    let st = state.read().unwrap_or_else(|e| e.into_inner());

    let session = resolve_session(&st, session_id.as_deref());

    if let Some(session) = session {
        drain_pending_to(&session.dir);
        session.control.submit();
        info!(session_id = %session.id, "Signalled submit");
        Response::Ok
    } else {
        drop(st);
        move_pending_to_submissions()
    }
}

/// Cancel: resolve session, signal cancelled. No session -> clear pending.
fn handle_cancel(session_id: Option<String>, state: &Arc<RwLock<DaemonState>>) -> Response {
    let st = state.read().unwrap_or_else(|e| e.into_inner());

    let session = resolve_session(&st, session_id.as_deref());

    if let Some(session) = session {
        session.control.cancel();
        info!(session_id = %session.id, "Signalled cancel");
        Response::Ok
    } else {
        drop(st);
        let pending_sub = paths::pending_dir().join("submission");
        let _ = fs::write(&pending_sub, "");
        Response::Ok
    }
}

/// Verify: resolve session, read submission + options.json, validate.
fn handle_verify(session_id: Option<String>, state: &Arc<RwLock<DaemonState>>) -> Response {
    let st = state.read().unwrap_or_else(|e| e.into_inner());

    let session = match resolve_session(&st, session_id.as_deref()) {
        Some(s) => s,
        None => return Response::Error("No active session to verify".to_string()),
    };

    let session_dir = session.dir.clone();
    let portal = session.portal.clone();
    let operation = session.operation.clone();
    drop(st);

    // Read submission
    let entries = files::read_lines(&session_dir.join("submission"));

    // Read options.json
    let options: serde_json::Value = match fs::read_to_string(session_dir.join("options.json")) {
        Ok(json) => match serde_json::from_str(&json) {
            Ok(v) => v,
            Err(e) => return Response::Error(format!("Failed to parse options: {e}")),
        },
        Err(e) => return Response::Error(format!("Failed to read options: {e}")),
    };

    match portal::validate(&portal, &operation, &entries, &options) {
        Ok(_) => Response::Ok,
        Err(msg) => Response::Error(msg),
    }
}

/// Reset: resolve session, rewrite submission with initial entries.
fn handle_reset(session_id: Option<String>, state: &Arc<RwLock<DaemonState>>) -> Response {
    let st = state.read().unwrap_or_else(|e| e.into_inner());

    let session = match resolve_session(&st, session_id.as_deref()) {
        Some(s) => s,
        None => return Response::Error("No active session to reset".to_string()),
    };

    let sub_path = session.dir.join("submission");
    let entries = session.initial_entries.clone();
    let sid = session.id.clone();
    drop(st);

    match files::write_lines(&sub_path, &entries) {
        Ok(()) => {
            info!(session_id = %sid, "Reset submission to initial state");
            Response::Ok
        }
        Err(e) => Response::Error(format!("Failed to reset: {e}")),
    }
}

/// List all active sessions.
fn handle_list(state: &Arc<RwLock<DaemonState>>) -> Response {
    let st = state.read().unwrap_or_else(|e| e.into_inner());
    let sessions = st.sessions.iter().map(SessionInfo::from).collect();
    Response::Sessions(sessions)
}

/// Resolve a session: by ID, or earliest if None.
fn resolve_session<'a>(
    state: &'a DaemonState,
    session_id: Option<&str>,
) -> Option<&'a RegisteredSession> {
    match session_id {
        Some(id) => state.sessions.get(id),
        None => state.sessions.iter().min_by_key(|s| s.created),
    }
}

/// Move pending/submission -> submissions/<ts>-any/submission
fn move_pending_to_submissions() -> Response {
    let pending_sub = paths::pending_dir().join("submission");
    let entries = files::read_lines(&pending_sub);

    if entries.is_empty() {
        return Response::Error("No pending entries to submit".to_string());
    }

    let ts = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap_or_default()
        .as_millis();

    let sub_dir = paths::submissions_dir().join(format!("{}-any", ts));
    if let Err(e) = fs::create_dir_all(&sub_dir) {
        return Response::Error(format!("Failed to create submission dir: {e}"));
    }

    if let Err(e) = files::write_lines(&sub_dir.join("submission"), &entries) {
        return Response::Error(format!("Failed to write submission: {e}"));
    }

    let _ = fs::write(&pending_sub, "");
    info!(entries = entries.len(), "Created submission");
    Response::Ok
}