1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::sync::Mutex;
4
5use rmpv::Value;
6use rpc_runtime_client::RpcClient;
7use rpc_runtime_core::{InstanceId, MethodId};
8use rpc_runtime_errors::{RuntimeError, RuntimeErrorCode};
9use tokio::sync::broadcast;
10
11use super::services::*;
12use super::types::*;
13
14#[derive(Clone)]
15pub struct ArchiveServiceClient {
16 inner: Arc<ArchiveServiceClientInner>,
17}
18
19struct ArchiveServiceClientInner {
20 client: RpcClient,
21 instance_id: InstanceId,
22}
23
24impl ArchiveServiceClient {
25 pub fn instance_id(&self) -> InstanceId {
26 self.inner.instance_id
27 }
28 pub fn for_instance(client: RpcClient, instance_id: InstanceId) -> Self {
29 Self {
30 inner: Arc::new(ArchiveServiceClientInner {
31 client,
32 instance_id,
33 }),
34 }
35 }
36 pub async fn resolve_named(
37 client: RpcClient,
38 name: impl Into<String>,
39 ) -> Result<Self, RuntimeError> {
40 let mut ids = client.resolve_instance_ids(vec![name.into()]).await?;
41 let id = ids.pop().unwrap_or(0);
42 let instance_id = InstanceId::new(id).ok_or_else(|| {
43 RuntimeError::runtime(
44 RuntimeErrorCode::InstanceNotFound,
45 "named instance was not found",
46 )
47 })?;
48 Ok(Self::for_instance(client, instance_id))
49 }
50 pub async fn zip(&self, request: &ZipRequest) -> Result<Empty, RuntimeError> {
51 let payload = encode_zip_request(request);
52 let response = self
53 .inner
54 .client
55 .call(
56 self.inner.instance_id,
57 MethodId::new(ARCHIVE_SERVICE_ZIP_METHOD_ID),
58 payload,
59 )
60 .await?;
61 decode_empty(&response)
62 }
63 pub async fn unzip(&self, request: &UnzipRequest) -> Result<Empty, RuntimeError> {
64 let payload = encode_unzip_request(request);
65 let response = self
66 .inner
67 .client
68 .call(
69 self.inner.instance_id,
70 MethodId::new(ARCHIVE_SERVICE_UNZIP_METHOD_ID),
71 payload,
72 )
73 .await?;
74 decode_empty(&response)
75 }
76}
77
78#[derive(Clone)]
79pub struct FileSystemServiceClient {
80 inner: Arc<FileSystemServiceClientInner>,
81}
82
83struct FileSystemServiceClientInner {
84 client: RpcClient,
85 instance_id: InstanceId,
86}
87
88impl FileSystemServiceClient {
89 pub fn instance_id(&self) -> InstanceId {
90 self.inner.instance_id
91 }
92 pub fn for_instance(client: RpcClient, instance_id: InstanceId) -> Self {
93 Self {
94 inner: Arc::new(FileSystemServiceClientInner {
95 client,
96 instance_id,
97 }),
98 }
99 }
100 pub async fn resolve_named(
101 client: RpcClient,
102 name: impl Into<String>,
103 ) -> Result<Self, RuntimeError> {
104 let mut ids = client.resolve_instance_ids(vec![name.into()]).await?;
105 let id = ids.pop().unwrap_or(0);
106 let instance_id = InstanceId::new(id).ok_or_else(|| {
107 RuntimeError::runtime(
108 RuntimeErrorCode::InstanceNotFound,
109 "named instance was not found",
110 )
111 })?;
112 Ok(Self::for_instance(client, instance_id))
113 }
114 pub async fn read_file(&self, request: &PathRequest) -> Result<BinaryPayload, RuntimeError> {
115 let payload = encode_path_request(request);
116 let response = self
117 .inner
118 .client
119 .call(
120 self.inner.instance_id,
121 MethodId::new(FILE_SYSTEM_SERVICE_READ_FILE_METHOD_ID),
122 payload,
123 )
124 .await?;
125 decode_binary_payload(&response)
126 }
127 pub async fn write_file(&self, request: &WriteFileRequest) -> Result<Empty, RuntimeError> {
128 let payload = encode_write_file_request(request);
129 let response = self
130 .inner
131 .client
132 .call(
133 self.inner.instance_id,
134 MethodId::new(FILE_SYSTEM_SERVICE_WRITE_FILE_METHOD_ID),
135 payload,
136 )
137 .await?;
138 decode_empty(&response)
139 }
140 pub async fn append_file(&self, request: &WriteFileRequest) -> Result<Empty, RuntimeError> {
141 let payload = encode_write_file_request(request);
142 let response = self
143 .inner
144 .client
145 .call(
146 self.inner.instance_id,
147 MethodId::new(FILE_SYSTEM_SERVICE_APPEND_FILE_METHOD_ID),
148 payload,
149 )
150 .await?;
151 decode_empty(&response)
152 }
153 pub async fn mkdir(&self, request: &MkdirRequest) -> Result<Empty, RuntimeError> {
154 let payload = encode_mkdir_request(request);
155 let response = self
156 .inner
157 .client
158 .call(
159 self.inner.instance_id,
160 MethodId::new(FILE_SYSTEM_SERVICE_MKDIR_METHOD_ID),
161 payload,
162 )
163 .await?;
164 decode_empty(&response)
165 }
166 pub async fn read_dir(&self, request: &PathRequest) -> Result<Vec<DirEntry>, RuntimeError> {
167 let payload = encode_path_request(request);
168 let response = self
169 .inner
170 .client
171 .call(
172 self.inner.instance_id,
173 MethodId::new(FILE_SYSTEM_SERVICE_READ_DIR_METHOD_ID),
174 payload,
175 )
176 .await?;
177 match &response {
178 Value::Array(items) => items.iter().map(|item| decode_dir_entry(item)).collect(),
179 _ => Err(decode_error("field `response` must be list")),
180 }
181 }
182 pub async fn stat(&self, request: &PathRequest) -> Result<FileStat, RuntimeError> {
183 let payload = encode_path_request(request);
184 let response = self
185 .inner
186 .client
187 .call(
188 self.inner.instance_id,
189 MethodId::new(FILE_SYSTEM_SERVICE_STAT_METHOD_ID),
190 payload,
191 )
192 .await?;
193 decode_file_stat(&response)
194 }
195 pub async fn exists(&self, request: &PathRequest) -> Result<bool, RuntimeError> {
196 let payload = encode_path_request(request);
197 let response = self
198 .inner
199 .client
200 .call(
201 self.inner.instance_id,
202 MethodId::new(FILE_SYSTEM_SERVICE_EXISTS_METHOD_ID),
203 payload,
204 )
205 .await?;
206 (&response)
207 .as_bool()
208 .ok_or_else(|| decode_error("field `response` must be bool"))
209 }
210 pub async fn remove(&self, request: &RemoveRequest) -> Result<Empty, RuntimeError> {
211 let payload = encode_remove_request(request);
212 let response = self
213 .inner
214 .client
215 .call(
216 self.inner.instance_id,
217 MethodId::new(FILE_SYSTEM_SERVICE_REMOVE_METHOD_ID),
218 payload,
219 )
220 .await?;
221 decode_empty(&response)
222 }
223 pub async fn rename(&self, request: &RenameRequest) -> Result<Empty, RuntimeError> {
224 let payload = encode_rename_request(request);
225 let response = self
226 .inner
227 .client
228 .call(
229 self.inner.instance_id,
230 MethodId::new(FILE_SYSTEM_SERVICE_RENAME_METHOD_ID),
231 payload,
232 )
233 .await?;
234 decode_empty(&response)
235 }
236 pub async fn copy_file(&self, request: &RenameRequest) -> Result<Empty, RuntimeError> {
237 let payload = encode_rename_request(request);
238 let response = self
239 .inner
240 .client
241 .call(
242 self.inner.instance_id,
243 MethodId::new(FILE_SYSTEM_SERVICE_COPY_FILE_METHOD_ID),
244 payload,
245 )
246 .await?;
247 decode_empty(&response)
248 }
249 pub async fn open_file(
250 &self,
251 request: &OpenFileRequest,
252 ) -> Result<ResourceHandle, RuntimeError> {
253 let payload = encode_open_file_request(request);
254 let response = self
255 .inner
256 .client
257 .call(
258 self.inner.instance_id,
259 MethodId::new(FILE_SYSTEM_SERVICE_OPEN_FILE_METHOD_ID),
260 payload,
261 )
262 .await?;
263 decode_resource_handle(&response)
264 }
265 pub async fn file_read(
266 &self,
267 request: &FileReadRequest,
268 ) -> Result<BinaryPayload, RuntimeError> {
269 let payload = encode_file_read_request(request);
270 let response = self
271 .inner
272 .client
273 .call(
274 self.inner.instance_id,
275 MethodId::new(FILE_SYSTEM_SERVICE_FILE_READ_METHOD_ID),
276 payload,
277 )
278 .await?;
279 decode_binary_payload(&response)
280 }
281 pub async fn file_write(&self, request: &FileWriteRequest) -> Result<Empty, RuntimeError> {
282 let payload = encode_file_write_request(request);
283 let response = self
284 .inner
285 .client
286 .call(
287 self.inner.instance_id,
288 MethodId::new(FILE_SYSTEM_SERVICE_FILE_WRITE_METHOD_ID),
289 payload,
290 )
291 .await?;
292 decode_empty(&response)
293 }
294 pub async fn file_flush(&self, request: &ResourceHandle) -> Result<Empty, RuntimeError> {
295 let payload = encode_resource_handle(request);
296 let response = self
297 .inner
298 .client
299 .call(
300 self.inner.instance_id,
301 MethodId::new(FILE_SYSTEM_SERVICE_FILE_FLUSH_METHOD_ID),
302 payload,
303 )
304 .await?;
305 decode_empty(&response)
306 }
307 pub async fn file_seek(
308 &self,
309 request: &FileSeekRequest,
310 ) -> Result<FileSeekResult, RuntimeError> {
311 let payload = encode_file_seek_request(request);
312 let response = self
313 .inner
314 .client
315 .call(
316 self.inner.instance_id,
317 MethodId::new(FILE_SYSTEM_SERVICE_FILE_SEEK_METHOD_ID),
318 payload,
319 )
320 .await?;
321 decode_file_seek_result(&response)
322 }
323 pub async fn file_set_len(&self, request: &FileSetLenRequest) -> Result<Empty, RuntimeError> {
324 let payload = encode_file_set_len_request(request);
325 let response = self
326 .inner
327 .client
328 .call(
329 self.inner.instance_id,
330 MethodId::new(FILE_SYSTEM_SERVICE_FILE_SET_LEN_METHOD_ID),
331 payload,
332 )
333 .await?;
334 decode_empty(&response)
335 }
336 pub async fn file_close(&self, request: &ResourceHandle) -> Result<Empty, RuntimeError> {
337 let payload = encode_resource_handle(request);
338 let response = self
339 .inner
340 .client
341 .call(
342 self.inner.instance_id,
343 MethodId::new(FILE_SYSTEM_SERVICE_FILE_CLOSE_METHOD_ID),
344 payload,
345 )
346 .await?;
347 decode_empty(&response)
348 }
349}
350
351#[derive(Clone)]
352pub struct RuntimeServiceClient {
353 inner: Arc<RuntimeServiceClientInner>,
354}
355
356struct RuntimeServiceClientInner {
357 client: RpcClient,
358 instance_id: InstanceId,
359}
360
361impl RuntimeServiceClient {
362 pub fn instance_id(&self) -> InstanceId {
363 self.inner.instance_id
364 }
365 pub fn for_instance(client: RpcClient, instance_id: InstanceId) -> Self {
366 Self {
367 inner: Arc::new(RuntimeServiceClientInner {
368 client,
369 instance_id,
370 }),
371 }
372 }
373 pub async fn resolve_named(
374 client: RpcClient,
375 name: impl Into<String>,
376 ) -> Result<Self, RuntimeError> {
377 let mut ids = client.resolve_instance_ids(vec![name.into()]).await?;
378 let id = ids.pop().unwrap_or(0);
379 let instance_id = InstanceId::new(id).ok_or_else(|| {
380 RuntimeError::runtime(
381 RuntimeErrorCode::InstanceNotFound,
382 "named instance was not found",
383 )
384 })?;
385 Ok(Self::for_instance(client, instance_id))
386 }
387 pub async fn get_info(&self, request: &Empty) -> Result<RuntimeInfo, RuntimeError> {
388 let payload = encode_empty(request);
389 let response = self
390 .inner
391 .client
392 .call(
393 self.inner.instance_id,
394 MethodId::new(RUNTIME_SERVICE_GET_INFO_METHOD_ID),
395 payload,
396 )
397 .await?;
398 decode_runtime_info(&response)
399 }
400 pub async fn list_capabilities(&self, request: &Empty) -> Result<Vec<String>, RuntimeError> {
401 let payload = encode_empty(request);
402 let response = self
403 .inner
404 .client
405 .call(
406 self.inner.instance_id,
407 MethodId::new(RUNTIME_SERVICE_LIST_CAPABILITIES_METHOD_ID),
408 payload,
409 )
410 .await?;
411 match &response {
412 Value::Array(items) => items
413 .iter()
414 .map(|item| match item {
415 Value::String(value) => value
416 .as_str()
417 .map(ToOwned::to_owned)
418 .ok_or_else(|| decode_error("field `response` must be valid UTF-8")),
419 _ => Err(decode_error("field `response` must be string")),
420 })
421 .collect(),
422 _ => Err(decode_error("field `response` must be list")),
423 }
424 }
425 pub async fn dispose_resources(&self, request: &Empty) -> Result<Empty, RuntimeError> {
426 let payload = encode_empty(request);
427 let response = self
428 .inner
429 .client
430 .call(
431 self.inner.instance_id,
432 MethodId::new(RUNTIME_SERVICE_DISPOSE_RESOURCES_METHOD_ID),
433 payload,
434 )
435 .await?;
436 decode_empty(&response)
437 }
438}
439
440#[derive(Clone)]
441pub struct SqliteServiceClient {
442 inner: Arc<SqliteServiceClientInner>,
443}
444
445struct SqliteServiceClientInner {
446 client: RpcClient,
447 instance_id: InstanceId,
448}
449
450impl SqliteServiceClient {
451 pub fn instance_id(&self) -> InstanceId {
452 self.inner.instance_id
453 }
454 pub fn for_instance(client: RpcClient, instance_id: InstanceId) -> Self {
455 Self {
456 inner: Arc::new(SqliteServiceClientInner {
457 client,
458 instance_id,
459 }),
460 }
461 }
462 pub async fn resolve_named(
463 client: RpcClient,
464 name: impl Into<String>,
465 ) -> Result<Self, RuntimeError> {
466 let mut ids = client.resolve_instance_ids(vec![name.into()]).await?;
467 let id = ids.pop().unwrap_or(0);
468 let instance_id = InstanceId::new(id).ok_or_else(|| {
469 RuntimeError::runtime(
470 RuntimeErrorCode::InstanceNotFound,
471 "named instance was not found",
472 )
473 })?;
474 Ok(Self::for_instance(client, instance_id))
475 }
476 pub async fn open(&self, request: &SqliteOpenRequest) -> Result<ResourceHandle, RuntimeError> {
477 let payload = encode_sqlite_open_request(request);
478 let response = self
479 .inner
480 .client
481 .call(
482 self.inner.instance_id,
483 MethodId::new(SQLITE_SERVICE_OPEN_METHOD_ID),
484 payload,
485 )
486 .await?;
487 decode_resource_handle(&response)
488 }
489 pub async fn close(&self, request: &ResourceHandle) -> Result<Empty, RuntimeError> {
490 let payload = encode_resource_handle(request);
491 let response = self
492 .inner
493 .client
494 .call(
495 self.inner.instance_id,
496 MethodId::new(SQLITE_SERVICE_CLOSE_METHOD_ID),
497 payload,
498 )
499 .await?;
500 decode_empty(&response)
501 }
502 pub async fn execute_batch(
503 &self,
504 request: &SqliteStatementRequest,
505 ) -> Result<Empty, RuntimeError> {
506 let payload = encode_sqlite_statement_request(request);
507 let response = self
508 .inner
509 .client
510 .call(
511 self.inner.instance_id,
512 MethodId::new(SQLITE_SERVICE_EXECUTE_BATCH_METHOD_ID),
513 payload,
514 )
515 .await?;
516 decode_empty(&response)
517 }
518 pub async fn run(
519 &self,
520 request: &SqliteStatementRequest,
521 ) -> Result<SqliteRunResult, RuntimeError> {
522 let payload = encode_sqlite_statement_request(request);
523 let response = self
524 .inner
525 .client
526 .call(
527 self.inner.instance_id,
528 MethodId::new(SQLITE_SERVICE_RUN_METHOD_ID),
529 payload,
530 )
531 .await?;
532 decode_sqlite_run_result(&response)
533 }
534 pub async fn query_one(
535 &self,
536 request: &SqliteStatementRequest,
537 ) -> Result<Option<SqliteRow>, RuntimeError> {
538 let payload = encode_sqlite_statement_request(request);
539 let response = self
540 .inner
541 .client
542 .call(
543 self.inner.instance_id,
544 MethodId::new(SQLITE_SERVICE_QUERY_ONE_METHOD_ID),
545 payload,
546 )
547 .await?;
548 Ok(match &response {
549 Value::Nil => None,
550 value => Some(decode_sqlite_row(value)?),
551 })
552 }
553 pub async fn query_all(
554 &self,
555 request: &SqliteStatementRequest,
556 ) -> Result<Vec<SqliteRow>, RuntimeError> {
557 let payload = encode_sqlite_statement_request(request);
558 let response = self
559 .inner
560 .client
561 .call(
562 self.inner.instance_id,
563 MethodId::new(SQLITE_SERVICE_QUERY_ALL_METHOD_ID),
564 payload,
565 )
566 .await?;
567 match &response {
568 Value::Array(items) => items.iter().map(|item| decode_sqlite_row(item)).collect(),
569 _ => Err(decode_error("field `response` must be list")),
570 }
571 }
572 pub async fn transaction(
573 &self,
574 request: &SqliteTransactionRequest,
575 ) -> Result<Empty, RuntimeError> {
576 let payload = encode_sqlite_transaction_request(request);
577 let response = self
578 .inner
579 .client
580 .call(
581 self.inner.instance_id,
582 MethodId::new(SQLITE_SERVICE_TRANSACTION_METHOD_ID),
583 payload,
584 )
585 .await?;
586 decode_empty(&response)
587 }
588}
589
590#[derive(Clone)]
591pub struct SystemServiceClient {
592 inner: Arc<SystemServiceClientInner>,
593}
594
595struct SystemServiceClientInner {
596 client: RpcClient,
597 instance_id: InstanceId,
598}
599
600impl SystemServiceClient {
601 pub fn instance_id(&self) -> InstanceId {
602 self.inner.instance_id
603 }
604 pub fn for_instance(client: RpcClient, instance_id: InstanceId) -> Self {
605 Self {
606 inner: Arc::new(SystemServiceClientInner {
607 client,
608 instance_id,
609 }),
610 }
611 }
612 pub async fn resolve_named(
613 client: RpcClient,
614 name: impl Into<String>,
615 ) -> Result<Self, RuntimeError> {
616 let mut ids = client.resolve_instance_ids(vec![name.into()]).await?;
617 let id = ids.pop().unwrap_or(0);
618 let instance_id = InstanceId::new(id).ok_or_else(|| {
619 RuntimeError::runtime(
620 RuntimeErrorCode::InstanceNotFound,
621 "named instance was not found",
622 )
623 })?;
624 Ok(Self::for_instance(client, instance_id))
625 }
626 pub async fn get_power_capabilities(
627 &self,
628 request: &Empty,
629 ) -> Result<Vec<String>, RuntimeError> {
630 let payload = encode_empty(request);
631 let response = self
632 .inner
633 .client
634 .call(
635 self.inner.instance_id,
636 MethodId::new(SYSTEM_SERVICE_GET_POWER_CAPABILITIES_METHOD_ID),
637 payload,
638 )
639 .await?;
640 match &response {
641 Value::Array(items) => items
642 .iter()
643 .map(|item| match item {
644 Value::String(value) => value
645 .as_str()
646 .map(ToOwned::to_owned)
647 .ok_or_else(|| decode_error("field `response` must be valid UTF-8")),
648 _ => Err(decode_error("field `response` must be string")),
649 })
650 .collect(),
651 _ => Err(decode_error("field `response` must be list")),
652 }
653 }
654 pub async fn shutdown(&self, request: &PowerOptions) -> Result<Empty, RuntimeError> {
655 let payload = encode_power_options(request);
656 let response = self
657 .inner
658 .client
659 .call(
660 self.inner.instance_id,
661 MethodId::new(SYSTEM_SERVICE_SHUTDOWN_METHOD_ID),
662 payload,
663 )
664 .await?;
665 decode_empty(&response)
666 }
667 pub async fn reboot(&self, request: &PowerOptions) -> Result<Empty, RuntimeError> {
668 let payload = encode_power_options(request);
669 let response = self
670 .inner
671 .client
672 .call(
673 self.inner.instance_id,
674 MethodId::new(SYSTEM_SERVICE_REBOOT_METHOD_ID),
675 payload,
676 )
677 .await?;
678 decode_empty(&response)
679 }
680}
681
682#[derive(Clone)]
683pub struct TcpServiceClient {
684 inner: Arc<TcpServiceClientInner>,
685}
686
687struct TcpServiceClientInner {
688 client: RpcClient,
689 instance_id: InstanceId,
690 event_sender: Mutex<Option<broadcast::Sender<Result<TcpEvent, RuntimeError>>>>,
691}
692
693impl TcpServiceClient {
694 pub fn instance_id(&self) -> InstanceId {
695 self.inner.instance_id
696 }
697 pub fn for_instance(client: RpcClient, instance_id: InstanceId) -> Self {
698 Self {
699 inner: Arc::new(TcpServiceClientInner {
700 client,
701 instance_id,
702 event_sender: Mutex::new(None),
703 }),
704 }
705 }
706 pub async fn resolve_named(
707 client: RpcClient,
708 name: impl Into<String>,
709 ) -> Result<Self, RuntimeError> {
710 let mut ids = client.resolve_instance_ids(vec![name.into()]).await?;
711 let id = ids.pop().unwrap_or(0);
712 let instance_id = InstanceId::new(id).ok_or_else(|| {
713 RuntimeError::runtime(
714 RuntimeErrorCode::InstanceNotFound,
715 "named instance was not found",
716 )
717 })?;
718 Ok(Self::for_instance(client, instance_id))
719 }
720 pub async fn connect(
721 &self,
722 request: &SocketConnectRequest,
723 ) -> Result<ResourceHandle, RuntimeError> {
724 let payload = encode_socket_connect_request(request);
725 let response = self
726 .inner
727 .client
728 .call(
729 self.inner.instance_id,
730 MethodId::new(TCP_SERVICE_CONNECT_METHOD_ID),
731 payload,
732 )
733 .await?;
734 decode_resource_handle(&response)
735 }
736 pub async fn socket_write(&self, request: &SocketWriteRequest) -> Result<Empty, RuntimeError> {
737 let payload = encode_socket_write_request(request);
738 let response = self
739 .inner
740 .client
741 .call(
742 self.inner.instance_id,
743 MethodId::new(TCP_SERVICE_SOCKET_WRITE_METHOD_ID),
744 payload,
745 )
746 .await?;
747 decode_empty(&response)
748 }
749 pub async fn socket_end(&self, request: &ResourceHandle) -> Result<Empty, RuntimeError> {
750 let payload = encode_resource_handle(request);
751 let response = self
752 .inner
753 .client
754 .call(
755 self.inner.instance_id,
756 MethodId::new(TCP_SERVICE_SOCKET_END_METHOD_ID),
757 payload,
758 )
759 .await?;
760 decode_empty(&response)
761 }
762 pub async fn socket_close(&self, request: &ResourceHandle) -> Result<Empty, RuntimeError> {
763 let payload = encode_resource_handle(request);
764 let response = self
765 .inner
766 .client
767 .call(
768 self.inner.instance_id,
769 MethodId::new(TCP_SERVICE_SOCKET_CLOSE_METHOD_ID),
770 payload,
771 )
772 .await?;
773 decode_empty(&response)
774 }
775 pub async fn server_listen(
776 &self,
777 request: &SocketListenRequest,
778 ) -> Result<ListenResult, RuntimeError> {
779 let payload = encode_socket_listen_request(request);
780 let response = self
781 .inner
782 .client
783 .call(
784 self.inner.instance_id,
785 MethodId::new(TCP_SERVICE_SERVER_LISTEN_METHOD_ID),
786 payload,
787 )
788 .await?;
789 decode_listen_result(&response)
790 }
791 pub async fn server_close(&self, request: &ResourceHandle) -> Result<Empty, RuntimeError> {
792 let payload = encode_resource_handle(request);
793 let response = self
794 .inner
795 .client
796 .call(
797 self.inner.instance_id,
798 MethodId::new(TCP_SERVICE_SERVER_CLOSE_METHOD_ID),
799 payload,
800 )
801 .await?;
802 decode_empty(&response)
803 }
804 pub fn subscribe_event(&self) -> TcpServiceEventReceiver {
805 let mut sender = self
806 .inner
807 .event_sender
808 .lock()
809 .expect("notification sender mutex poisoned");
810 if sender.is_none() {
811 let (tx, _) = broadcast::channel(128);
812 let forward_tx = tx.clone();
813 let mut source = self.inner.client.subscribe_notifications(
814 Some(self.inner.instance_id),
815 Some(TCP_SERVICE_EVENT_NOTIFICATION_ID),
816 );
817 tokio::spawn(async move {
818 while let Some(notification) = source.recv().await {
819 let _ = forward_tx.send(decode_tcp_event(¬ification.payload));
820 }
821 });
822 *sender = Some(tx);
823 }
824 TcpServiceEventReceiver {
825 inner: sender
826 .as_ref()
827 .expect("notification sender initialized")
828 .subscribe(),
829 }
830 }
831}
832
833pub struct TcpServiceEventReceiver {
834 inner: broadcast::Receiver<Result<TcpEvent, RuntimeError>>,
835}
836
837impl TcpServiceEventReceiver {
838 pub async fn recv(&mut self) -> Option<Result<TcpEvent, RuntimeError>> {
839 match self.inner.recv().await {
840 Ok(value) => Some(value),
841 Err(broadcast::error::RecvError::Closed) => None,
842 Err(broadcast::error::RecvError::Lagged(skipped)) => Some(Err(RuntimeError::runtime(
843 RuntimeErrorCode::InternalRuntimeError,
844 format!("notification receiver lagged by {skipped} messages"),
845 ))),
846 }
847 }
848}
849
850#[derive(Clone)]
851pub struct WebSocketServiceClient {
852 inner: Arc<WebSocketServiceClientInner>,
853}
854
855struct WebSocketServiceClientInner {
856 client: RpcClient,
857 instance_id: InstanceId,
858 event_sender: Mutex<Option<broadcast::Sender<Result<WebSocketEvent, RuntimeError>>>>,
859}
860
861impl WebSocketServiceClient {
862 pub fn instance_id(&self) -> InstanceId {
863 self.inner.instance_id
864 }
865 pub fn for_instance(client: RpcClient, instance_id: InstanceId) -> Self {
866 Self {
867 inner: Arc::new(WebSocketServiceClientInner {
868 client,
869 instance_id,
870 event_sender: Mutex::new(None),
871 }),
872 }
873 }
874 pub async fn resolve_named(
875 client: RpcClient,
876 name: impl Into<String>,
877 ) -> Result<Self, RuntimeError> {
878 let mut ids = client.resolve_instance_ids(vec![name.into()]).await?;
879 let id = ids.pop().unwrap_or(0);
880 let instance_id = InstanceId::new(id).ok_or_else(|| {
881 RuntimeError::runtime(
882 RuntimeErrorCode::InstanceNotFound,
883 "named instance was not found",
884 )
885 })?;
886 Ok(Self::for_instance(client, instance_id))
887 }
888 pub async fn connect(
889 &self,
890 request: &WebSocketConnectRequest,
891 ) -> Result<ResourceHandle, RuntimeError> {
892 let payload = encode_web_socket_connect_request(request);
893 let response = self
894 .inner
895 .client
896 .call(
897 self.inner.instance_id,
898 MethodId::new(WEB_SOCKET_SERVICE_CONNECT_METHOD_ID),
899 payload,
900 )
901 .await?;
902 decode_resource_handle(&response)
903 }
904 pub async fn send_text(
905 &self,
906 request: &WebSocketSendTextRequest,
907 ) -> Result<Empty, RuntimeError> {
908 let payload = encode_web_socket_send_text_request(request);
909 let response = self
910 .inner
911 .client
912 .call(
913 self.inner.instance_id,
914 MethodId::new(WEB_SOCKET_SERVICE_SEND_TEXT_METHOD_ID),
915 payload,
916 )
917 .await?;
918 decode_empty(&response)
919 }
920 pub async fn send_binary(&self, request: &SocketWriteRequest) -> Result<Empty, RuntimeError> {
921 let payload = encode_socket_write_request(request);
922 let response = self
923 .inner
924 .client
925 .call(
926 self.inner.instance_id,
927 MethodId::new(WEB_SOCKET_SERVICE_SEND_BINARY_METHOD_ID),
928 payload,
929 )
930 .await?;
931 decode_empty(&response)
932 }
933 pub async fn close(&self, request: &ResourceHandle) -> Result<Empty, RuntimeError> {
934 let payload = encode_resource_handle(request);
935 let response = self
936 .inner
937 .client
938 .call(
939 self.inner.instance_id,
940 MethodId::new(WEB_SOCKET_SERVICE_CLOSE_METHOD_ID),
941 payload,
942 )
943 .await?;
944 decode_empty(&response)
945 }
946 pub async fn server_listen(
947 &self,
948 request: &SocketListenRequest,
949 ) -> Result<ListenResult, RuntimeError> {
950 let payload = encode_socket_listen_request(request);
951 let response = self
952 .inner
953 .client
954 .call(
955 self.inner.instance_id,
956 MethodId::new(WEB_SOCKET_SERVICE_SERVER_LISTEN_METHOD_ID),
957 payload,
958 )
959 .await?;
960 decode_listen_result(&response)
961 }
962 pub async fn server_close(&self, request: &ResourceHandle) -> Result<Empty, RuntimeError> {
963 let payload = encode_resource_handle(request);
964 let response = self
965 .inner
966 .client
967 .call(
968 self.inner.instance_id,
969 MethodId::new(WEB_SOCKET_SERVICE_SERVER_CLOSE_METHOD_ID),
970 payload,
971 )
972 .await?;
973 decode_empty(&response)
974 }
975 pub fn subscribe_event(&self) -> WebSocketServiceEventReceiver {
976 let mut sender = self
977 .inner
978 .event_sender
979 .lock()
980 .expect("notification sender mutex poisoned");
981 if sender.is_none() {
982 let (tx, _) = broadcast::channel(128);
983 let forward_tx = tx.clone();
984 let mut source = self.inner.client.subscribe_notifications(
985 Some(self.inner.instance_id),
986 Some(WEB_SOCKET_SERVICE_EVENT_NOTIFICATION_ID),
987 );
988 tokio::spawn(async move {
989 while let Some(notification) = source.recv().await {
990 let _ = forward_tx.send(decode_web_socket_event(¬ification.payload));
991 }
992 });
993 *sender = Some(tx);
994 }
995 WebSocketServiceEventReceiver {
996 inner: sender
997 .as_ref()
998 .expect("notification sender initialized")
999 .subscribe(),
1000 }
1001 }
1002}
1003
1004pub struct WebSocketServiceEventReceiver {
1005 inner: broadcast::Receiver<Result<WebSocketEvent, RuntimeError>>,
1006}
1007
1008impl WebSocketServiceEventReceiver {
1009 pub async fn recv(&mut self) -> Option<Result<WebSocketEvent, RuntimeError>> {
1010 match self.inner.recv().await {
1011 Ok(value) => Some(value),
1012 Err(broadcast::error::RecvError::Closed) => None,
1013 Err(broadcast::error::RecvError::Lagged(skipped)) => Some(Err(RuntimeError::runtime(
1014 RuntimeErrorCode::InternalRuntimeError,
1015 format!("notification receiver lagged by {skipped} messages"),
1016 ))),
1017 }
1018 }
1019}