1use std::io;
2use std::sync::Arc;
3
4use serde::Serialize;
5use tokio::net::unix::OwnedWriteHalf;
6use tokio::net::{UnixListener, UnixStream};
7use tokio::sync::Mutex;
8use tokio::sync::broadcast::error::RecvError;
9
10use crate::LocalLb;
11use crate::ipc::protocol::{Frame, Request};
12use crate::model::errors::LbResult;
13
14pub async fn serve(listener: UnixListener, lb: LocalLb) {
15 loop {
16 match listener.accept().await {
17 Ok((stream, _addr)) => {
18 let lb = lb.clone();
19 tokio::spawn(async move {
20 if let Err(err) = handle_conn(stream, lb).await {
21 if err.kind() == io::ErrorKind::UnexpectedEof {
22 tracing::debug!("ipc guest disconnected");
23 } else {
24 tracing::warn!(?err, "ipc connection ended");
25 }
26 }
27 });
28 }
29 Err(err) => {
30 tracing::error!(?err, "ipc accept failed; aborting serve loop");
31 return;
32 }
33 }
34 }
35}
36
37async fn handle_conn(stream: UnixStream, lb: LocalLb) -> io::Result<()> {
38 let (mut reader, write_half) = stream.into_split();
39 let writer = Arc::new(Mutex::new(write_half));
40
41 loop {
42 let frame = Frame::read(&mut reader).await?;
43 match frame {
44 Frame::Request { seq, body: Request::Subscribe } => {
45 let lb_for_task = lb.clone();
46 let writer_for_task = Arc::clone(&writer);
47 tokio::spawn(forward_events(lb_for_task, writer_for_task, seq));
48 send_response(&writer, seq, enc_plain(())).await?;
49 }
50 Frame::Request { seq, body } => {
51 let output = dispatch(&lb, body).await;
52 send_response(&writer, seq, output).await?;
53 }
54 Frame::Response { .. } | Frame::Event { .. } | Frame::EventEnd { .. } => {
55 return Err(io::Error::new(
56 io::ErrorKind::InvalidData,
57 "guest sent a host-only frame",
58 ));
59 }
60 }
61 }
62}
63
64async fn send_response(
65 writer: &Arc<Mutex<OwnedWriteHalf>>, seq: u64, output: Vec<u8>,
66) -> io::Result<()> {
67 let response = Frame::Response { seq, output };
68 let mut w = writer.lock().await;
69 response.write(&mut *w).await
70}
71
72async fn forward_events(lb: LocalLb, writer: Arc<Mutex<OwnedWriteHalf>>, stream_seq: u64) {
73 let mut rx = lb.subscribe();
74 loop {
75 match rx.recv().await {
76 Ok(event) => {
77 let frame = Frame::Event { stream_seq, body: event };
78 let mut w = writer.lock().await;
79 if let Err(err) = frame.write(&mut *w).await {
80 tracing::debug!(?err, "ipc: event forward failed");
81 return;
82 }
83 }
84 Err(RecvError::Lagged(n)) => {
85 tracing::warn!(skipped = n, "ipc: event subscriber lagged");
86 continue;
87 }
88 Err(RecvError::Closed) => break,
89 }
90 }
91
92 let frame = Frame::EventEnd { stream_seq };
93 let mut w = writer.lock().await;
94 let _ = frame.write(&mut *w).await;
95}
96
97fn enc<Out: Serialize>(result: LbResult<Out>) -> Vec<u8> {
98 bincode::serialize(&result).unwrap_or_else(|e| {
99 tracing::error!(?e, "ipc: serialize response failed");
100 Vec::new()
101 })
102}
103
104fn enc_plain<Out: Serialize>(value: Out) -> Vec<u8> {
105 enc::<Out>(Ok(value))
106}
107
108pub(crate) async fn dispatch(lb: &LocalLb, req: Request) -> Vec<u8> {
109 match req {
110 Request::CreateAccount { username, api_url, welcome_doc } => {
111 enc(lb.create_account(&username, &api_url, welcome_doc).await)
112 }
113 Request::ImportAccount { key, api_url } => {
114 enc(lb.import_account(&key, api_url.as_deref()).await)
115 }
116 Request::ImportAccountPrivateKeyV1 { account } => {
117 enc(lb.import_account_private_key_v1(account).await)
118 }
119 Request::ImportAccountPhrase { phrase, api_url } => {
120 match <[String; 24]>::try_from(phrase) {
121 Ok(owned) => {
122 let refs: [&str; 24] = std::array::from_fn(|i| owned[i].as_str());
123 enc(lb.import_account_phrase(refs, &api_url).await)
124 }
125 Err(v) => enc::<crate::model::account::Account>(Err(
126 crate::model::errors::LbErrKind::Unexpected(format!(
127 "ipc: import_account_phrase expected 24 words, got {}",
128 v.len()
129 ))
130 .into(),
131 )),
132 }
133 }
134 Request::DeleteAccount => enc(lb.delete_account().await),
135 Request::GetAccount => enc(lb.get_account().cloned()),
136
137 Request::SuggestedDocs { settings } => enc(lb.suggested_docs(settings).await),
138 Request::ClearSuggested => enc(lb.clear_suggested().await),
139 Request::ClearSuggestedId { id } => enc(lb.clear_suggested_id(id).await),
140 Request::AppForegrounded => {
141 lb.app_foregrounded();
142 enc_plain(())
143 }
144
145 Request::DisappearAccount { username } => enc(lb.disappear_account(&username).await),
146 Request::DisappearFile { id } => enc(lb.disappear_file(id).await),
147 Request::ListUsers { filter } => enc(lb.list_users(filter).await),
148 Request::GetAccountInfo { identifier } => enc(lb.get_account_info(identifier).await),
149 Request::AdminValidateAccount { username } => enc(lb.validate_account(&username).await),
150 Request::AdminValidateServer => enc(lb.validate_server().await),
151 Request::AdminFileInfo { id } => enc(lb.file_info(id).await),
152 Request::RebuildIndex { index } => enc(lb.rebuild_index(index).await),
153 Request::SetUserTier { username, info } => enc(lb.set_user_tier(&username, info).await),
154
155 Request::UpgradeAccountStripe { account_tier } => {
156 enc(lb.upgrade_account_stripe(account_tier).await)
157 }
158 Request::UpgradeAccountGooglePlay { purchase_token, account_id } => enc(lb
159 .upgrade_account_google_play(&purchase_token, &account_id)
160 .await),
161 Request::UpgradeAccountAppStore { original_transaction_id, app_account_token } => enc(lb
162 .upgrade_account_app_store(original_transaction_id, app_account_token)
163 .await),
164 Request::CancelSubscription => enc(lb.cancel_subscription().await),
165 Request::GetSubscriptionInfo => enc(lb.get_subscription_info().await),
166
167 #[cfg(not(target_family = "wasm"))]
168 Request::RecentPanic => enc(lb.recent_panic().await),
169 #[cfg(not(target_family = "wasm"))]
170 Request::WritePanicToFile { error_header, bt } => {
171 enc(lb.write_panic_to_file(error_header, bt).await)
172 }
173 #[cfg(not(target_family = "wasm"))]
174 Request::DebugInfo { os_info, check_docs } => enc(lb.debug_info(os_info, check_docs).await),
175
176 Request::ReadDocument { id, user_activity } => {
177 enc(lb.read_document(id, user_activity).await)
178 }
179 Request::WriteDocument { id, content } => enc(lb.write_document(id, &content).await),
180 Request::ReadDocumentWithHmac { id, user_activity } => {
181 enc(lb.read_document_with_hmac(id, user_activity).await)
182 }
183 Request::SafeWrite { id, old_hmac, content } => {
184 enc(lb.safe_write(id, old_hmac, content).await)
185 }
186
187 Request::CreateFile { name, parent, file_type } => {
188 enc(lb.create_file(&name, &parent, file_type).await)
189 }
190 Request::RenameFile { id, new_name } => enc(lb.rename_file(&id, &new_name).await),
191 Request::MoveFile { id, new_parent } => enc(lb.move_file(&id, &new_parent).await),
192 Request::Delete { id } => enc(lb.delete(&id).await),
193 Request::Root => enc(lb.root().await),
194 Request::ListMetadatas => enc(lb.list_metadatas().await),
195 Request::GetChildren { id } => enc(lb.get_children(&id).await),
196 Request::GetAndGetChildrenRecursively { id } => {
197 enc(lb.get_and_get_children_recursively(&id).await)
198 }
199 Request::GetFileById { id } => enc(lb.get_file_by_id(id).await),
200 Request::GetFileLinkUrl { id } => enc(lb.get_file_link_url(id).await),
201 Request::LocalChanges => enc_plain(lb.local_changes().await),
202
203 Request::TestRepoIntegrity { check_docs } => enc(lb.test_repo_integrity(check_docs).await),
204
205 Request::CreateLinkAtPath { path, target_id } => {
206 enc(lb.create_link_at_path(&path, target_id).await)
207 }
208 Request::CreateAtPath { path } => enc(lb.create_at_path(&path).await),
209 Request::GetByPath { path } => enc(lb.get_by_path(&path).await),
210 Request::GetPathById { id } => enc(lb.get_path_by_id(id).await),
211 Request::ListPaths { filter } => enc(lb.list_paths(filter).await),
212 Request::ListPathsWithIds { filter } => enc(lb.list_paths_with_ids(filter).await),
213
214 Request::ShareFile { id, username, mode } => enc(lb.share_file(id, &username, mode).await),
215 Request::GetPendingShares => enc(lb.get_pending_shares().await),
216 Request::GetPendingShareFiles => enc(lb.get_pending_share_files().await),
217 Request::KnownUsernames => enc(lb.known_usernames().await),
218 Request::RejectShare { id } => enc(lb.reject_share(&id).await),
219
220 Request::PinFile { id } => enc(lb.pin_file(id).await),
221 Request::UnpinFile { id } => enc(lb.unpin_file(id).await),
222 Request::ListPinned => enc(lb.list_pinned().await),
223
224 Request::GetUsage => enc(lb.get_usage().await),
225
226 Request::Sync => enc(lb.sync().await),
227 Request::Status => enc_plain(lb.status().await),
228 Request::GetLastSynced => enc(lb.get_last_synced().await),
229 Request::GetLastSyncedHuman => enc(lb.get_last_synced_human().await),
230 Request::Subscribe => unreachable!("handle_conn special-cases Subscribe"),
231 #[cfg(not(target_family = "wasm"))]
232 Request::BuildIndex => enc(lb.build_index().await),
233 #[cfg(not(target_family = "wasm"))]
234 Request::ReloadSearchIndex => enc(lb.reload_search_index()),
235 #[cfg(not(target_family = "wasm"))]
236 Request::Search { input, cfg } => enc(lb.search(&input, cfg).await),
237 }
238}