Skip to main content

lb_rs/ipc/
server.rs

1use std::io;
2use std::sync::Arc;
3
4use serde::Serialize;
5use tokio::net::unix::OwnedWriteHalf;
6use tokio::net::{UnixListener, UnixStream};
7use tokio::sync::Mutex;
8use tokio::sync::broadcast::error::RecvError;
9
10use crate::LocalLb;
11use crate::ipc::protocol::{Frame, Request};
12use crate::model::errors::LbResult;
13
14pub async fn serve(listener: UnixListener, lb: LocalLb) {
15    loop {
16        match listener.accept().await {
17            Ok((stream, _addr)) => {
18                let lb = lb.clone();
19                tokio::spawn(async move {
20                    if let Err(err) = handle_conn(stream, lb).await {
21                        if err.kind() == io::ErrorKind::UnexpectedEof {
22                            tracing::debug!("ipc guest disconnected");
23                        } else {
24                            tracing::warn!(?err, "ipc connection ended");
25                        }
26                    }
27                });
28            }
29            Err(err) => {
30                tracing::error!(?err, "ipc accept failed; aborting serve loop");
31                return;
32            }
33        }
34    }
35}
36
37async fn handle_conn(stream: UnixStream, lb: LocalLb) -> io::Result<()> {
38    let (mut reader, write_half) = stream.into_split();
39    let writer = Arc::new(Mutex::new(write_half));
40
41    loop {
42        let frame = Frame::read(&mut reader).await?;
43        match frame {
44            Frame::Request { seq, body: Request::Subscribe } => {
45                let lb_for_task = lb.clone();
46                let writer_for_task = Arc::clone(&writer);
47                tokio::spawn(forward_events(lb_for_task, writer_for_task, seq));
48                send_response(&writer, seq, enc_plain(())).await?;
49            }
50            Frame::Request { seq, body } => {
51                let output = dispatch(&lb, body).await;
52                send_response(&writer, seq, output).await?;
53            }
54            Frame::Response { .. } | Frame::Event { .. } | Frame::EventEnd { .. } => {
55                return Err(io::Error::new(
56                    io::ErrorKind::InvalidData,
57                    "guest sent a host-only frame",
58                ));
59            }
60        }
61    }
62}
63
64async fn send_response(
65    writer: &Arc<Mutex<OwnedWriteHalf>>, seq: u64, output: Vec<u8>,
66) -> io::Result<()> {
67    let response = Frame::Response { seq, output };
68    let mut w = writer.lock().await;
69    response.write(&mut *w).await
70}
71
72async fn forward_events(lb: LocalLb, writer: Arc<Mutex<OwnedWriteHalf>>, stream_seq: u64) {
73    let mut rx = lb.subscribe();
74    loop {
75        match rx.recv().await {
76            Ok(event) => {
77                let frame = Frame::Event { stream_seq, body: event };
78                let mut w = writer.lock().await;
79                if let Err(err) = frame.write(&mut *w).await {
80                    tracing::debug!(?err, "ipc: event forward failed");
81                    return;
82                }
83            }
84            Err(RecvError::Lagged(n)) => {
85                tracing::warn!(skipped = n, "ipc: event subscriber lagged");
86                continue;
87            }
88            Err(RecvError::Closed) => break,
89        }
90    }
91
92    let frame = Frame::EventEnd { stream_seq };
93    let mut w = writer.lock().await;
94    let _ = frame.write(&mut *w).await;
95}
96
97fn enc<Out: Serialize>(result: LbResult<Out>) -> Vec<u8> {
98    bincode::serialize(&result).unwrap_or_else(|e| {
99        tracing::error!(?e, "ipc: serialize response failed");
100        Vec::new()
101    })
102}
103
104fn enc_plain<Out: Serialize>(value: Out) -> Vec<u8> {
105    enc::<Out>(Ok(value))
106}
107
108pub(crate) async fn dispatch(lb: &LocalLb, req: Request) -> Vec<u8> {
109    match req {
110        Request::CreateAccount { username, api_url, welcome_doc } => {
111            enc(lb.create_account(&username, &api_url, welcome_doc).await)
112        }
113        Request::ImportAccount { key, api_url } => {
114            enc(lb.import_account(&key, api_url.as_deref()).await)
115        }
116        Request::ImportAccountPrivateKeyV1 { account } => {
117            enc(lb.import_account_private_key_v1(account).await)
118        }
119        Request::ImportAccountPhrase { phrase, api_url } => {
120            match <[String; 24]>::try_from(phrase) {
121                Ok(owned) => {
122                    let refs: [&str; 24] = std::array::from_fn(|i| owned[i].as_str());
123                    enc(lb.import_account_phrase(refs, &api_url).await)
124                }
125                Err(v) => enc::<crate::model::account::Account>(Err(
126                    crate::model::errors::LbErrKind::Unexpected(format!(
127                        "ipc: import_account_phrase expected 24 words, got {}",
128                        v.len()
129                    ))
130                    .into(),
131                )),
132            }
133        }
134        Request::DeleteAccount => enc(lb.delete_account().await),
135        Request::GetAccount => enc(lb.get_account().cloned()),
136
137        Request::SuggestedDocs { settings } => enc(lb.suggested_docs(settings).await),
138        Request::ClearSuggested => enc(lb.clear_suggested().await),
139        Request::ClearSuggestedId { id } => enc(lb.clear_suggested_id(id).await),
140        Request::AppForegrounded => {
141            lb.app_foregrounded();
142            enc_plain(())
143        }
144
145        Request::DisappearAccount { username } => enc(lb.disappear_account(&username).await),
146        Request::DisappearFile { id } => enc(lb.disappear_file(id).await),
147        Request::ListUsers { filter } => enc(lb.list_users(filter).await),
148        Request::GetAccountInfo { identifier } => enc(lb.get_account_info(identifier).await),
149        Request::AdminValidateAccount { username } => enc(lb.validate_account(&username).await),
150        Request::AdminValidateServer => enc(lb.validate_server().await),
151        Request::AdminFileInfo { id } => enc(lb.file_info(id).await),
152        Request::RebuildIndex { index } => enc(lb.rebuild_index(index).await),
153        Request::SetUserTier { username, info } => enc(lb.set_user_tier(&username, info).await),
154
155        Request::UpgradeAccountStripe { account_tier } => {
156            enc(lb.upgrade_account_stripe(account_tier).await)
157        }
158        Request::UpgradeAccountGooglePlay { purchase_token, account_id } => enc(lb
159            .upgrade_account_google_play(&purchase_token, &account_id)
160            .await),
161        Request::UpgradeAccountAppStore { original_transaction_id, app_account_token } => enc(lb
162            .upgrade_account_app_store(original_transaction_id, app_account_token)
163            .await),
164        Request::CancelSubscription => enc(lb.cancel_subscription().await),
165        Request::GetSubscriptionInfo => enc(lb.get_subscription_info().await),
166
167        #[cfg(not(target_family = "wasm"))]
168        Request::RecentPanic => enc(lb.recent_panic().await),
169        #[cfg(not(target_family = "wasm"))]
170        Request::WritePanicToFile { error_header, bt } => {
171            enc(lb.write_panic_to_file(error_header, bt).await)
172        }
173        #[cfg(not(target_family = "wasm"))]
174        Request::DebugInfo { os_info, check_docs } => enc(lb.debug_info(os_info, check_docs).await),
175
176        Request::ReadDocument { id, user_activity } => {
177            enc(lb.read_document(id, user_activity).await)
178        }
179        Request::WriteDocument { id, content } => enc(lb.write_document(id, &content).await),
180        Request::ReadDocumentWithHmac { id, user_activity } => {
181            enc(lb.read_document_with_hmac(id, user_activity).await)
182        }
183        Request::SafeWrite { id, old_hmac, content } => {
184            enc(lb.safe_write(id, old_hmac, content).await)
185        }
186
187        Request::CreateFile { name, parent, file_type } => {
188            enc(lb.create_file(&name, &parent, file_type).await)
189        }
190        Request::RenameFile { id, new_name } => enc(lb.rename_file(&id, &new_name).await),
191        Request::MoveFile { id, new_parent } => enc(lb.move_file(&id, &new_parent).await),
192        Request::Delete { id } => enc(lb.delete(&id).await),
193        Request::Root => enc(lb.root().await),
194        Request::ListMetadatas => enc(lb.list_metadatas().await),
195        Request::GetChildren { id } => enc(lb.get_children(&id).await),
196        Request::GetAndGetChildrenRecursively { id } => {
197            enc(lb.get_and_get_children_recursively(&id).await)
198        }
199        Request::GetFileById { id } => enc(lb.get_file_by_id(id).await),
200        Request::GetFileLinkUrl { id } => enc(lb.get_file_link_url(id).await),
201        Request::LocalChanges => enc_plain(lb.local_changes().await),
202
203        Request::TestRepoIntegrity { check_docs } => enc(lb.test_repo_integrity(check_docs).await),
204
205        Request::CreateLinkAtPath { path, target_id } => {
206            enc(lb.create_link_at_path(&path, target_id).await)
207        }
208        Request::CreateAtPath { path } => enc(lb.create_at_path(&path).await),
209        Request::GetByPath { path } => enc(lb.get_by_path(&path).await),
210        Request::GetPathById { id } => enc(lb.get_path_by_id(id).await),
211        Request::ListPaths { filter } => enc(lb.list_paths(filter).await),
212        Request::ListPathsWithIds { filter } => enc(lb.list_paths_with_ids(filter).await),
213
214        Request::ShareFile { id, username, mode } => enc(lb.share_file(id, &username, mode).await),
215        Request::GetPendingShares => enc(lb.get_pending_shares().await),
216        Request::GetPendingShareFiles => enc(lb.get_pending_share_files().await),
217        Request::KnownUsernames => enc(lb.known_usernames().await),
218        Request::RejectShare { id } => enc(lb.reject_share(&id).await),
219
220        Request::PinFile { id } => enc(lb.pin_file(id).await),
221        Request::UnpinFile { id } => enc(lb.unpin_file(id).await),
222        Request::ListPinned => enc(lb.list_pinned().await),
223
224        Request::GetUsage => enc(lb.get_usage().await),
225
226        Request::Sync => enc(lb.sync().await),
227        Request::Status => enc_plain(lb.status().await),
228        Request::GetLastSynced => enc(lb.get_last_synced().await),
229        Request::GetLastSyncedHuman => enc(lb.get_last_synced_human().await),
230        Request::Subscribe => unreachable!("handle_conn special-cases Subscribe"),
231        #[cfg(not(target_family = "wasm"))]
232        Request::BuildIndex => enc(lb.build_index().await),
233        #[cfg(not(target_family = "wasm"))]
234        Request::ReloadSearchIndex => enc(lb.reload_search_index()),
235        #[cfg(not(target_family = "wasm"))]
236        Request::Search { input, cfg } => enc(lb.search(&input, cfg).await),
237    }
238}