sos_net/account/
remote.rs

1//! Connect a remote data source with a local account.
2use crate::Result;
3use async_trait::async_trait;
4use sos_account::LocalAccount;
5use sos_core::{AccountId, Origin};
6use sos_protocol::{
7    network_client::HttpClient, RemoteResult, RemoteSync, SyncClient,
8    SyncOptions,
9};
10use sos_remote_sync::{AutoMerge, RemoteSyncHandler};
11use sos_signer::ed25519::BoxedEd25519Signer;
12use sos_sync::{SyncDirection, UpdateSet};
13use std::{collections::HashMap, sync::Arc};
14use tokio::sync::Mutex;
15
16#[cfg(feature = "files")]
17use sos_protocol::transfer::{
18    FileOperation, FileSet, FileSyncClient, FileTransferQueueRequest,
19    FileTransferQueueSender, TransferOperation,
20};
21
22/// Collection of remote targets for synchronization.
23pub(crate) type Remotes = HashMap<Origin, RemoteBridge>;
24
25/// Bridge between a local account and a remote.
26#[derive(Clone)]
27pub struct RemoteBridge {
28    /// Account identifier.
29    account_id: AccountId,
30    /// Account so we can replay events
31    /// when a remote diff is merged.
32    pub(super) account: Arc<Mutex<LocalAccount>>,
33    /// Client to use for remote communication.
34    pub(crate) client: HttpClient,
35    // File transfers.
36    #[cfg(feature = "files")]
37    pub(crate) file_transfer_queue: FileTransferQueueSender,
38}
39
40impl RemoteBridge {
41    /// Create a new remote bridge that updates the given
42    /// local account.
43    pub fn new(
44        account_id: AccountId,
45        account: Arc<Mutex<LocalAccount>>,
46        origin: Origin,
47        device: BoxedEd25519Signer,
48        connection_id: String,
49    ) -> Result<Self> {
50        let client =
51            HttpClient::new(account_id, origin, device, connection_id)?;
52
53        #[cfg(feature = "files")]
54        let (file_transfer_queue, _) =
55            tokio::sync::broadcast::channel::<FileTransferQueueRequest>(32);
56
57        Ok(Self {
58            account_id,
59            account,
60            client,
61            #[cfg(feature = "files")]
62            file_transfer_queue,
63        })
64    }
65}
66
67#[async_trait]
68impl RemoteSyncHandler for RemoteBridge {
69    type Client = HttpClient;
70    type Account = LocalAccount;
71    type Error = crate::Error;
72
73    fn direction(&self) -> SyncDirection {
74        SyncDirection::Push
75    }
76
77    fn client(&self) -> &Self::Client {
78        &self.client
79    }
80
81    fn origin(&self) -> &Origin {
82        self.client.origin()
83    }
84
85    fn account_id(&self) -> &AccountId {
86        &self.account_id
87    }
88
89    fn account(&self) -> Arc<Mutex<Self::Account>> {
90        self.account.clone()
91    }
92
93    #[cfg(feature = "files")]
94    fn file_transfer_queue(&self) -> &FileTransferQueueSender {
95        &self.file_transfer_queue
96    }
97
98    #[cfg(feature = "files")]
99    async fn execute_sync_file_transfers(&self) -> Result<()> {
100        use sos_sync::StorageEventLogs;
101        let external_files = {
102            let account = self.account();
103            let account = account.lock().await;
104            account.canonical_files().await?
105        };
106
107        tracing::debug!(
108            canonical_len = %external_files.len(),
109            "sync_file_transfers",
110        );
111
112        let file_set = FileSet(external_files);
113        let file_transfers = self.client().compare_files(file_set).await?;
114
115        tracing::debug!(
116            uploads_len = %file_transfers.uploads.0.len(),
117            downloads_len = %file_transfers.downloads.0.len(),
118            "sync_file_transfers",
119        );
120
121        let mut ops = Vec::new();
122        for file in file_transfers.uploads.0 {
123            ops.push(FileOperation(file, TransferOperation::Upload));
124        }
125
126        for file in file_transfers.downloads.0 {
127            ops.push(FileOperation(file, TransferOperation::Download));
128        }
129
130        tracing::debug!(
131            operations_len = %ops.len(),
132            receiver_count = %self.file_transfer_queue.receiver_count(),
133            "sync_file_transfers",
134        );
135
136        if !ops.is_empty() && self.file_transfer_queue.receiver_count() > 0 {
137            let _ = self.file_transfer_queue.send(ops);
138        }
139
140        Ok(())
141    }
142}
143
144#[async_trait]
145impl AutoMerge for RemoteBridge {}
146
147#[async_trait]
148impl RemoteSync for RemoteBridge {
149    type Error = crate::Error;
150
151    async fn sync(&self) -> RemoteResult<Self::Error> {
152        self.sync_with_options(&Default::default()).await
153    }
154
155    async fn sync_with_options(
156        &self,
157        options: &SyncOptions,
158    ) -> RemoteResult<Self::Error> {
159        match self.execute_sync(options).await {
160            Ok(outcome) => RemoteResult {
161                origin: self.origin().clone(),
162                result: Ok(outcome),
163            },
164            Err(e) => RemoteResult {
165                origin: self.origin().clone(),
166                result: Err(e),
167            },
168        }
169    }
170
171    async fn force_update(
172        &self,
173        account_data: UpdateSet,
174    ) -> RemoteResult<Self::Error> {
175        match self.client.update_account(account_data).await {
176            Ok(_) => RemoteResult {
177                origin: self.origin().clone(),
178                result: Ok(None),
179            },
180            Err(e) => RemoteResult {
181                origin: self.origin().clone(),
182                result: Err(e.into()),
183            },
184        }
185    }
186
187    #[cfg(feature = "files")]
188    async fn sync_file_transfers(&self) -> RemoteResult<Self::Error> {
189        match self.execute_sync_file_transfers().await {
190            Ok(_) => RemoteResult {
191                origin: self.origin().clone(),
192                result: Ok(None),
193            },
194            Err(e) => RemoteResult {
195                origin: self.origin().clone(),
196                result: Err(e),
197            },
198        }
199    }
200}
201
202#[cfg(feature = "listen")]
203mod listen {
204    use crate::RemoteBridge;
205    use sos_protocol::{
206        network_client::{ListenOptions, WebSocketHandle},
207        NetworkChangeEvent,
208    };
209    use tokio::sync::mpsc;
210
211    // Listen to change notifications and attempt to sync.
212    #[cfg(not(target_arch = "wasm32"))]
213    impl RemoteBridge {
214        /// Spawn a task that listens for changes
215        /// from the remote server and applies any changes
216        /// from the remote to the local account.
217        ///
218        /// The keypair for the websocket connection must not be
219        /// the same as the main client so you should always generate
220        /// a new keypair for this connection. Otherwise transports
221        /// will collide on the server as they are identified by
222        /// public key.
223        pub(crate) fn listen(
224            &self,
225            options: ListenOptions,
226            channel: mpsc::Sender<NetworkChangeEvent>,
227        ) -> WebSocketHandle {
228            let handle = self.client.listen(options, move |notification| {
229                let tx = channel.clone();
230                async move {
231                    tracing::debug!(notification = ?notification);
232                    let _ = tx.send(notification).await;
233                }
234            });
235
236            handle
237        }
238    }
239}