use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::Mutex;
use rmpv::Value;
use rpc_runtime_client::RpcClient;
use rpc_runtime_core::{InstanceId, MethodId};
use rpc_runtime_errors::{RuntimeError, RuntimeErrorCode};
use tokio::sync::broadcast;
use super::services::*;
use super::types::*;
#[derive(Clone)]
pub struct ArchiveServiceClient {
inner: Arc<ArchiveServiceClientInner>,
}
struct ArchiveServiceClientInner {
client: RpcClient,
instance_id: InstanceId,
}
impl ArchiveServiceClient {
pub fn instance_id(&self) -> InstanceId {
self.inner.instance_id
}
pub fn for_instance(client: RpcClient, instance_id: InstanceId) -> Self {
Self {
inner: Arc::new(ArchiveServiceClientInner {
client,
instance_id,
}),
}
}
pub async fn resolve_named(
client: RpcClient,
name: impl Into<String>,
) -> Result<Self, RuntimeError> {
let mut ids = client.resolve_instance_ids(vec![name.into()]).await?;
let id = ids.pop().unwrap_or(0);
let instance_id = InstanceId::new(id).ok_or_else(|| {
RuntimeError::runtime(
RuntimeErrorCode::InstanceNotFound,
"named instance was not found",
)
})?;
Ok(Self::for_instance(client, instance_id))
}
pub async fn zip(&self, request: &ZipRequest) -> Result<Empty, RuntimeError> {
let payload = encode_zip_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(ARCHIVE_SERVICE_ZIP_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
pub async fn unzip(&self, request: &UnzipRequest) -> Result<Empty, RuntimeError> {
let payload = encode_unzip_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(ARCHIVE_SERVICE_UNZIP_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
}
#[derive(Clone)]
pub struct FileSystemServiceClient {
inner: Arc<FileSystemServiceClientInner>,
}
struct FileSystemServiceClientInner {
client: RpcClient,
instance_id: InstanceId,
}
impl FileSystemServiceClient {
pub fn instance_id(&self) -> InstanceId {
self.inner.instance_id
}
pub fn for_instance(client: RpcClient, instance_id: InstanceId) -> Self {
Self {
inner: Arc::new(FileSystemServiceClientInner {
client,
instance_id,
}),
}
}
pub async fn resolve_named(
client: RpcClient,
name: impl Into<String>,
) -> Result<Self, RuntimeError> {
let mut ids = client.resolve_instance_ids(vec![name.into()]).await?;
let id = ids.pop().unwrap_or(0);
let instance_id = InstanceId::new(id).ok_or_else(|| {
RuntimeError::runtime(
RuntimeErrorCode::InstanceNotFound,
"named instance was not found",
)
})?;
Ok(Self::for_instance(client, instance_id))
}
pub async fn read_file(&self, request: &PathRequest) -> Result<BinaryPayload, RuntimeError> {
let payload = encode_path_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(FILE_SYSTEM_SERVICE_READ_FILE_METHOD_ID),
payload,
)
.await?;
decode_binary_payload(&response)
}
pub async fn write_file(&self, request: &WriteFileRequest) -> Result<Empty, RuntimeError> {
let payload = encode_write_file_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(FILE_SYSTEM_SERVICE_WRITE_FILE_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
pub async fn append_file(&self, request: &WriteFileRequest) -> Result<Empty, RuntimeError> {
let payload = encode_write_file_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(FILE_SYSTEM_SERVICE_APPEND_FILE_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
pub async fn mkdir(&self, request: &MkdirRequest) -> Result<Empty, RuntimeError> {
let payload = encode_mkdir_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(FILE_SYSTEM_SERVICE_MKDIR_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
pub async fn read_dir(&self, request: &PathRequest) -> Result<Vec<DirEntry>, RuntimeError> {
let payload = encode_path_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(FILE_SYSTEM_SERVICE_READ_DIR_METHOD_ID),
payload,
)
.await?;
match &response {
Value::Array(items) => items.iter().map(|item| decode_dir_entry(item)).collect(),
_ => Err(decode_error("field `response` must be list")),
}
}
pub async fn stat(&self, request: &PathRequest) -> Result<FileStat, RuntimeError> {
let payload = encode_path_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(FILE_SYSTEM_SERVICE_STAT_METHOD_ID),
payload,
)
.await?;
decode_file_stat(&response)
}
pub async fn exists(&self, request: &PathRequest) -> Result<bool, RuntimeError> {
let payload = encode_path_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(FILE_SYSTEM_SERVICE_EXISTS_METHOD_ID),
payload,
)
.await?;
(&response)
.as_bool()
.ok_or_else(|| decode_error("field `response` must be bool"))
}
pub async fn remove(&self, request: &RemoveRequest) -> Result<Empty, RuntimeError> {
let payload = encode_remove_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(FILE_SYSTEM_SERVICE_REMOVE_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
pub async fn rename(&self, request: &RenameRequest) -> Result<Empty, RuntimeError> {
let payload = encode_rename_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(FILE_SYSTEM_SERVICE_RENAME_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
pub async fn copy_file(&self, request: &RenameRequest) -> Result<Empty, RuntimeError> {
let payload = encode_rename_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(FILE_SYSTEM_SERVICE_COPY_FILE_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
pub async fn open_file(
&self,
request: &OpenFileRequest,
) -> Result<ResourceHandle, RuntimeError> {
let payload = encode_open_file_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(FILE_SYSTEM_SERVICE_OPEN_FILE_METHOD_ID),
payload,
)
.await?;
decode_resource_handle(&response)
}
pub async fn file_read(
&self,
request: &FileReadRequest,
) -> Result<BinaryPayload, RuntimeError> {
let payload = encode_file_read_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(FILE_SYSTEM_SERVICE_FILE_READ_METHOD_ID),
payload,
)
.await?;
decode_binary_payload(&response)
}
pub async fn file_write(&self, request: &FileWriteRequest) -> Result<Empty, RuntimeError> {
let payload = encode_file_write_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(FILE_SYSTEM_SERVICE_FILE_WRITE_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
pub async fn file_flush(&self, request: &ResourceHandle) -> Result<Empty, RuntimeError> {
let payload = encode_resource_handle(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(FILE_SYSTEM_SERVICE_FILE_FLUSH_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
pub async fn file_seek(
&self,
request: &FileSeekRequest,
) -> Result<FileSeekResult, RuntimeError> {
let payload = encode_file_seek_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(FILE_SYSTEM_SERVICE_FILE_SEEK_METHOD_ID),
payload,
)
.await?;
decode_file_seek_result(&response)
}
pub async fn file_set_len(&self, request: &FileSetLenRequest) -> Result<Empty, RuntimeError> {
let payload = encode_file_set_len_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(FILE_SYSTEM_SERVICE_FILE_SET_LEN_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
pub async fn file_close(&self, request: &ResourceHandle) -> Result<Empty, RuntimeError> {
let payload = encode_resource_handle(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(FILE_SYSTEM_SERVICE_FILE_CLOSE_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
}
#[derive(Clone)]
pub struct RuntimeServiceClient {
inner: Arc<RuntimeServiceClientInner>,
}
struct RuntimeServiceClientInner {
client: RpcClient,
instance_id: InstanceId,
}
impl RuntimeServiceClient {
pub fn instance_id(&self) -> InstanceId {
self.inner.instance_id
}
pub fn for_instance(client: RpcClient, instance_id: InstanceId) -> Self {
Self {
inner: Arc::new(RuntimeServiceClientInner {
client,
instance_id,
}),
}
}
pub async fn resolve_named(
client: RpcClient,
name: impl Into<String>,
) -> Result<Self, RuntimeError> {
let mut ids = client.resolve_instance_ids(vec![name.into()]).await?;
let id = ids.pop().unwrap_or(0);
let instance_id = InstanceId::new(id).ok_or_else(|| {
RuntimeError::runtime(
RuntimeErrorCode::InstanceNotFound,
"named instance was not found",
)
})?;
Ok(Self::for_instance(client, instance_id))
}
pub async fn get_info(&self, request: &Empty) -> Result<RuntimeInfo, RuntimeError> {
let payload = encode_empty(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(RUNTIME_SERVICE_GET_INFO_METHOD_ID),
payload,
)
.await?;
decode_runtime_info(&response)
}
pub async fn list_capabilities(&self, request: &Empty) -> Result<Vec<String>, RuntimeError> {
let payload = encode_empty(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(RUNTIME_SERVICE_LIST_CAPABILITIES_METHOD_ID),
payload,
)
.await?;
match &response {
Value::Array(items) => items
.iter()
.map(|item| match item {
Value::String(value) => value
.as_str()
.map(ToOwned::to_owned)
.ok_or_else(|| decode_error("field `response` must be valid UTF-8")),
_ => Err(decode_error("field `response` must be string")),
})
.collect(),
_ => Err(decode_error("field `response` must be list")),
}
}
pub async fn dispose_resources(&self, request: &Empty) -> Result<Empty, RuntimeError> {
let payload = encode_empty(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(RUNTIME_SERVICE_DISPOSE_RESOURCES_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
}
#[derive(Clone)]
pub struct SqliteServiceClient {
inner: Arc<SqliteServiceClientInner>,
}
struct SqliteServiceClientInner {
client: RpcClient,
instance_id: InstanceId,
}
impl SqliteServiceClient {
pub fn instance_id(&self) -> InstanceId {
self.inner.instance_id
}
pub fn for_instance(client: RpcClient, instance_id: InstanceId) -> Self {
Self {
inner: Arc::new(SqliteServiceClientInner {
client,
instance_id,
}),
}
}
pub async fn resolve_named(
client: RpcClient,
name: impl Into<String>,
) -> Result<Self, RuntimeError> {
let mut ids = client.resolve_instance_ids(vec![name.into()]).await?;
let id = ids.pop().unwrap_or(0);
let instance_id = InstanceId::new(id).ok_or_else(|| {
RuntimeError::runtime(
RuntimeErrorCode::InstanceNotFound,
"named instance was not found",
)
})?;
Ok(Self::for_instance(client, instance_id))
}
pub async fn open(&self, request: &SqliteOpenRequest) -> Result<ResourceHandle, RuntimeError> {
let payload = encode_sqlite_open_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(SQLITE_SERVICE_OPEN_METHOD_ID),
payload,
)
.await?;
decode_resource_handle(&response)
}
pub async fn close(&self, request: &ResourceHandle) -> Result<Empty, RuntimeError> {
let payload = encode_resource_handle(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(SQLITE_SERVICE_CLOSE_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
pub async fn execute_batch(
&self,
request: &SqliteStatementRequest,
) -> Result<Empty, RuntimeError> {
let payload = encode_sqlite_statement_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(SQLITE_SERVICE_EXECUTE_BATCH_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
pub async fn run(
&self,
request: &SqliteStatementRequest,
) -> Result<SqliteRunResult, RuntimeError> {
let payload = encode_sqlite_statement_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(SQLITE_SERVICE_RUN_METHOD_ID),
payload,
)
.await?;
decode_sqlite_run_result(&response)
}
pub async fn query_one(
&self,
request: &SqliteStatementRequest,
) -> Result<Option<SqliteRow>, RuntimeError> {
let payload = encode_sqlite_statement_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(SQLITE_SERVICE_QUERY_ONE_METHOD_ID),
payload,
)
.await?;
Ok(match &response {
Value::Nil => None,
value => Some(decode_sqlite_row(value)?),
})
}
pub async fn query_all(
&self,
request: &SqliteStatementRequest,
) -> Result<Vec<SqliteRow>, RuntimeError> {
let payload = encode_sqlite_statement_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(SQLITE_SERVICE_QUERY_ALL_METHOD_ID),
payload,
)
.await?;
match &response {
Value::Array(items) => items.iter().map(|item| decode_sqlite_row(item)).collect(),
_ => Err(decode_error("field `response` must be list")),
}
}
pub async fn transaction(
&self,
request: &SqliteTransactionRequest,
) -> Result<Empty, RuntimeError> {
let payload = encode_sqlite_transaction_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(SQLITE_SERVICE_TRANSACTION_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
}
#[derive(Clone)]
pub struct SystemServiceClient {
inner: Arc<SystemServiceClientInner>,
}
struct SystemServiceClientInner {
client: RpcClient,
instance_id: InstanceId,
}
impl SystemServiceClient {
pub fn instance_id(&self) -> InstanceId {
self.inner.instance_id
}
pub fn for_instance(client: RpcClient, instance_id: InstanceId) -> Self {
Self {
inner: Arc::new(SystemServiceClientInner {
client,
instance_id,
}),
}
}
pub async fn resolve_named(
client: RpcClient,
name: impl Into<String>,
) -> Result<Self, RuntimeError> {
let mut ids = client.resolve_instance_ids(vec![name.into()]).await?;
let id = ids.pop().unwrap_or(0);
let instance_id = InstanceId::new(id).ok_or_else(|| {
RuntimeError::runtime(
RuntimeErrorCode::InstanceNotFound,
"named instance was not found",
)
})?;
Ok(Self::for_instance(client, instance_id))
}
pub async fn get_power_capabilities(
&self,
request: &Empty,
) -> Result<Vec<String>, RuntimeError> {
let payload = encode_empty(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(SYSTEM_SERVICE_GET_POWER_CAPABILITIES_METHOD_ID),
payload,
)
.await?;
match &response {
Value::Array(items) => items
.iter()
.map(|item| match item {
Value::String(value) => value
.as_str()
.map(ToOwned::to_owned)
.ok_or_else(|| decode_error("field `response` must be valid UTF-8")),
_ => Err(decode_error("field `response` must be string")),
})
.collect(),
_ => Err(decode_error("field `response` must be list")),
}
}
pub async fn shutdown(&self, request: &PowerOptions) -> Result<Empty, RuntimeError> {
let payload = encode_power_options(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(SYSTEM_SERVICE_SHUTDOWN_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
pub async fn reboot(&self, request: &PowerOptions) -> Result<Empty, RuntimeError> {
let payload = encode_power_options(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(SYSTEM_SERVICE_REBOOT_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
}
#[derive(Clone)]
pub struct TcpServiceClient {
inner: Arc<TcpServiceClientInner>,
}
struct TcpServiceClientInner {
client: RpcClient,
instance_id: InstanceId,
event_sender: Mutex<Option<broadcast::Sender<Result<TcpEvent, RuntimeError>>>>,
}
impl TcpServiceClient {
pub fn instance_id(&self) -> InstanceId {
self.inner.instance_id
}
pub fn for_instance(client: RpcClient, instance_id: InstanceId) -> Self {
Self {
inner: Arc::new(TcpServiceClientInner {
client,
instance_id,
event_sender: Mutex::new(None),
}),
}
}
pub async fn resolve_named(
client: RpcClient,
name: impl Into<String>,
) -> Result<Self, RuntimeError> {
let mut ids = client.resolve_instance_ids(vec![name.into()]).await?;
let id = ids.pop().unwrap_or(0);
let instance_id = InstanceId::new(id).ok_or_else(|| {
RuntimeError::runtime(
RuntimeErrorCode::InstanceNotFound,
"named instance was not found",
)
})?;
Ok(Self::for_instance(client, instance_id))
}
pub async fn connect(
&self,
request: &SocketConnectRequest,
) -> Result<ResourceHandle, RuntimeError> {
let payload = encode_socket_connect_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(TCP_SERVICE_CONNECT_METHOD_ID),
payload,
)
.await?;
decode_resource_handle(&response)
}
pub async fn socket_write(&self, request: &SocketWriteRequest) -> Result<Empty, RuntimeError> {
let payload = encode_socket_write_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(TCP_SERVICE_SOCKET_WRITE_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
pub async fn socket_end(&self, request: &ResourceHandle) -> Result<Empty, RuntimeError> {
let payload = encode_resource_handle(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(TCP_SERVICE_SOCKET_END_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
pub async fn socket_close(&self, request: &ResourceHandle) -> Result<Empty, RuntimeError> {
let payload = encode_resource_handle(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(TCP_SERVICE_SOCKET_CLOSE_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
pub async fn server_listen(
&self,
request: &SocketListenRequest,
) -> Result<ListenResult, RuntimeError> {
let payload = encode_socket_listen_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(TCP_SERVICE_SERVER_LISTEN_METHOD_ID),
payload,
)
.await?;
decode_listen_result(&response)
}
pub async fn server_close(&self, request: &ResourceHandle) -> Result<Empty, RuntimeError> {
let payload = encode_resource_handle(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(TCP_SERVICE_SERVER_CLOSE_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
pub fn subscribe_event(&self) -> TcpServiceEventReceiver {
let mut sender = self
.inner
.event_sender
.lock()
.expect("notification sender mutex poisoned");
if sender.is_none() {
let (tx, _) = broadcast::channel(128);
let forward_tx = tx.clone();
let mut source = self.inner.client.subscribe_notifications(
Some(self.inner.instance_id),
Some(TCP_SERVICE_EVENT_NOTIFICATION_ID),
);
tokio::spawn(async move {
while let Some(notification) = source.recv().await {
let _ = forward_tx.send(decode_tcp_event(¬ification.payload));
}
});
*sender = Some(tx);
}
TcpServiceEventReceiver {
inner: sender
.as_ref()
.expect("notification sender initialized")
.subscribe(),
}
}
}
pub struct TcpServiceEventReceiver {
inner: broadcast::Receiver<Result<TcpEvent, RuntimeError>>,
}
impl TcpServiceEventReceiver {
pub async fn recv(&mut self) -> Option<Result<TcpEvent, RuntimeError>> {
match self.inner.recv().await {
Ok(value) => Some(value),
Err(broadcast::error::RecvError::Closed) => None,
Err(broadcast::error::RecvError::Lagged(skipped)) => Some(Err(RuntimeError::runtime(
RuntimeErrorCode::InternalRuntimeError,
format!("notification receiver lagged by {skipped} messages"),
))),
}
}
}
#[derive(Clone)]
pub struct WebSocketServiceClient {
inner: Arc<WebSocketServiceClientInner>,
}
struct WebSocketServiceClientInner {
client: RpcClient,
instance_id: InstanceId,
event_sender: Mutex<Option<broadcast::Sender<Result<WebSocketEvent, RuntimeError>>>>,
}
impl WebSocketServiceClient {
pub fn instance_id(&self) -> InstanceId {
self.inner.instance_id
}
pub fn for_instance(client: RpcClient, instance_id: InstanceId) -> Self {
Self {
inner: Arc::new(WebSocketServiceClientInner {
client,
instance_id,
event_sender: Mutex::new(None),
}),
}
}
pub async fn resolve_named(
client: RpcClient,
name: impl Into<String>,
) -> Result<Self, RuntimeError> {
let mut ids = client.resolve_instance_ids(vec![name.into()]).await?;
let id = ids.pop().unwrap_or(0);
let instance_id = InstanceId::new(id).ok_or_else(|| {
RuntimeError::runtime(
RuntimeErrorCode::InstanceNotFound,
"named instance was not found",
)
})?;
Ok(Self::for_instance(client, instance_id))
}
pub async fn connect(
&self,
request: &WebSocketConnectRequest,
) -> Result<ResourceHandle, RuntimeError> {
let payload = encode_web_socket_connect_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(WEB_SOCKET_SERVICE_CONNECT_METHOD_ID),
payload,
)
.await?;
decode_resource_handle(&response)
}
pub async fn send_text(
&self,
request: &WebSocketSendTextRequest,
) -> Result<Empty, RuntimeError> {
let payload = encode_web_socket_send_text_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(WEB_SOCKET_SERVICE_SEND_TEXT_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
pub async fn send_binary(&self, request: &SocketWriteRequest) -> Result<Empty, RuntimeError> {
let payload = encode_socket_write_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(WEB_SOCKET_SERVICE_SEND_BINARY_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
pub async fn close(&self, request: &ResourceHandle) -> Result<Empty, RuntimeError> {
let payload = encode_resource_handle(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(WEB_SOCKET_SERVICE_CLOSE_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
pub async fn server_listen(
&self,
request: &SocketListenRequest,
) -> Result<ListenResult, RuntimeError> {
let payload = encode_socket_listen_request(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(WEB_SOCKET_SERVICE_SERVER_LISTEN_METHOD_ID),
payload,
)
.await?;
decode_listen_result(&response)
}
pub async fn server_close(&self, request: &ResourceHandle) -> Result<Empty, RuntimeError> {
let payload = encode_resource_handle(request);
let response = self
.inner
.client
.call(
self.inner.instance_id,
MethodId::new(WEB_SOCKET_SERVICE_SERVER_CLOSE_METHOD_ID),
payload,
)
.await?;
decode_empty(&response)
}
pub fn subscribe_event(&self) -> WebSocketServiceEventReceiver {
let mut sender = self
.inner
.event_sender
.lock()
.expect("notification sender mutex poisoned");
if sender.is_none() {
let (tx, _) = broadcast::channel(128);
let forward_tx = tx.clone();
let mut source = self.inner.client.subscribe_notifications(
Some(self.inner.instance_id),
Some(WEB_SOCKET_SERVICE_EVENT_NOTIFICATION_ID),
);
tokio::spawn(async move {
while let Some(notification) = source.recv().await {
let _ = forward_tx.send(decode_web_socket_event(¬ification.payload));
}
});
*sender = Some(tx);
}
WebSocketServiceEventReceiver {
inner: sender
.as_ref()
.expect("notification sender initialized")
.subscribe(),
}
}
}
pub struct WebSocketServiceEventReceiver {
inner: broadcast::Receiver<Result<WebSocketEvent, RuntimeError>>,
}
impl WebSocketServiceEventReceiver {
pub async fn recv(&mut self) -> Option<Result<WebSocketEvent, RuntimeError>> {
match self.inner.recv().await {
Ok(value) => Some(value),
Err(broadcast::error::RecvError::Closed) => None,
Err(broadcast::error::RecvError::Lagged(skipped)) => Some(Err(RuntimeError::runtime(
RuntimeErrorCode::InternalRuntimeError,
format!("notification receiver lagged by {skipped} messages"),
))),
}
}
}