sos_net/account/
remote.rs1use crate::Result;
3use async_trait::async_trait;
4use sos_account::LocalAccount;
5use sos_core::{AccountId, Origin};
6use sos_protocol::{
7 network_client::HttpClient, RemoteResult, RemoteSync, SyncClient,
8 SyncOptions,
9};
10use sos_remote_sync::{AutoMerge, RemoteSyncHandler};
11use sos_signer::ed25519::BoxedEd25519Signer;
12use sos_sync::{SyncDirection, UpdateSet};
13use std::{collections::HashMap, sync::Arc};
14use tokio::sync::Mutex;
15
16#[cfg(feature = "files")]
17use sos_protocol::transfer::{
18 FileOperation, FileSet, FileSyncClient, FileTransferQueueRequest,
19 FileTransferQueueSender, TransferOperation,
20};
21
22pub(crate) type Remotes = HashMap<Origin, RemoteBridge>;
24
25#[derive(Clone)]
27pub struct RemoteBridge {
28 account_id: AccountId,
30 pub(super) account: Arc<Mutex<LocalAccount>>,
33 pub(crate) client: HttpClient,
35 #[cfg(feature = "files")]
37 pub(crate) file_transfer_queue: FileTransferQueueSender,
38}
39
40impl RemoteBridge {
41 pub fn new(
44 account_id: AccountId,
45 account: Arc<Mutex<LocalAccount>>,
46 origin: Origin,
47 device: BoxedEd25519Signer,
48 connection_id: String,
49 ) -> Result<Self> {
50 let client =
51 HttpClient::new(account_id, origin, device, connection_id)?;
52
53 #[cfg(feature = "files")]
54 let (file_transfer_queue, _) =
55 tokio::sync::broadcast::channel::<FileTransferQueueRequest>(32);
56
57 Ok(Self {
58 account_id,
59 account,
60 client,
61 #[cfg(feature = "files")]
62 file_transfer_queue,
63 })
64 }
65}
66
67#[async_trait]
68impl RemoteSyncHandler for RemoteBridge {
69 type Client = HttpClient;
70 type Account = LocalAccount;
71 type Error = crate::Error;
72
73 fn direction(&self) -> SyncDirection {
74 SyncDirection::Push
75 }
76
77 fn client(&self) -> &Self::Client {
78 &self.client
79 }
80
81 fn origin(&self) -> &Origin {
82 self.client.origin()
83 }
84
85 fn account_id(&self) -> &AccountId {
86 &self.account_id
87 }
88
89 fn account(&self) -> Arc<Mutex<Self::Account>> {
90 self.account.clone()
91 }
92
93 #[cfg(feature = "files")]
94 fn file_transfer_queue(&self) -> &FileTransferQueueSender {
95 &self.file_transfer_queue
96 }
97
98 #[cfg(feature = "files")]
99 async fn execute_sync_file_transfers(&self) -> Result<()> {
100 use sos_sync::StorageEventLogs;
101 let external_files = {
102 let account = self.account();
103 let account = account.lock().await;
104 account.canonical_files().await?
105 };
106
107 tracing::debug!(
108 canonical_len = %external_files.len(),
109 "sync_file_transfers",
110 );
111
112 let file_set = FileSet(external_files);
113 let file_transfers = self.client().compare_files(file_set).await?;
114
115 tracing::debug!(
116 uploads_len = %file_transfers.uploads.0.len(),
117 downloads_len = %file_transfers.downloads.0.len(),
118 "sync_file_transfers",
119 );
120
121 let mut ops = Vec::new();
122 for file in file_transfers.uploads.0 {
123 ops.push(FileOperation(file, TransferOperation::Upload));
124 }
125
126 for file in file_transfers.downloads.0 {
127 ops.push(FileOperation(file, TransferOperation::Download));
128 }
129
130 tracing::debug!(
131 operations_len = %ops.len(),
132 receiver_count = %self.file_transfer_queue.receiver_count(),
133 "sync_file_transfers",
134 );
135
136 if !ops.is_empty() && self.file_transfer_queue.receiver_count() > 0 {
137 let _ = self.file_transfer_queue.send(ops);
138 }
139
140 Ok(())
141 }
142}
143
144#[async_trait]
145impl AutoMerge for RemoteBridge {}
146
147#[async_trait]
148impl RemoteSync for RemoteBridge {
149 type Error = crate::Error;
150
151 async fn sync(&self) -> RemoteResult<Self::Error> {
152 self.sync_with_options(&Default::default()).await
153 }
154
155 async fn sync_with_options(
156 &self,
157 options: &SyncOptions,
158 ) -> RemoteResult<Self::Error> {
159 match self.execute_sync(options).await {
160 Ok(outcome) => RemoteResult {
161 origin: self.origin().clone(),
162 result: Ok(outcome),
163 },
164 Err(e) => RemoteResult {
165 origin: self.origin().clone(),
166 result: Err(e),
167 },
168 }
169 }
170
171 async fn force_update(
172 &self,
173 account_data: UpdateSet,
174 ) -> RemoteResult<Self::Error> {
175 match self.client.update_account(account_data).await {
176 Ok(_) => RemoteResult {
177 origin: self.origin().clone(),
178 result: Ok(None),
179 },
180 Err(e) => RemoteResult {
181 origin: self.origin().clone(),
182 result: Err(e.into()),
183 },
184 }
185 }
186
187 #[cfg(feature = "files")]
188 async fn sync_file_transfers(&self) -> RemoteResult<Self::Error> {
189 match self.execute_sync_file_transfers().await {
190 Ok(_) => RemoteResult {
191 origin: self.origin().clone(),
192 result: Ok(None),
193 },
194 Err(e) => RemoteResult {
195 origin: self.origin().clone(),
196 result: Err(e),
197 },
198 }
199 }
200}
201
202#[cfg(feature = "listen")]
203mod listen {
204 use crate::RemoteBridge;
205 use sos_protocol::{
206 network_client::{ListenOptions, WebSocketHandle},
207 NetworkChangeEvent,
208 };
209 use tokio::sync::mpsc;
210
211 #[cfg(not(target_arch = "wasm32"))]
213 impl RemoteBridge {
214 pub(crate) fn listen(
224 &self,
225 options: ListenOptions,
226 channel: mpsc::Sender<NetworkChangeEvent>,
227 ) -> WebSocketHandle {
228 let handle = self.client.listen(options, move |notification| {
229 let tx = channel.clone();
230 async move {
231 tracing::debug!(notification = ?notification);
232 let _ = tx.send(notification).await;
233 }
234 });
235
236 handle
237 }
238 }
239}