1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
//! Skyway-webrtc-gateway-api crate was implemented as a simple REST API Wrapper for SkyWay WebRTC Gateway.
//! The crate is not implemented for domain knowledge such as event subscribe,
//! and can only be used by those who are familiar with the behavior of SkyWay WebRTC Gateway.
//!
//! This crate is EASY one for using SkyWay WebRTC Gateway.
//! The user of this crate can use the SkyWay WebRTC Gateway by simply exchanging a few JSON messages with each of the Sender and Reciever channels.
// # プログラムの全体構造
// SkyWay WebRTC GatewayをコントロールするためのJSONメッセージのやりとりは、
// skyway-webrtc-gateway-api crateを利用することでRustから実施することができるが、
// イベントのsubscribeなどロジック部分はむき出しであり、SkyWay WebRTC Gatewayの挙動を熟知していないと利用できない。
// このcrateでは、ロジックの隠蔽を行う。
// 操作指示用のSenderを1つ、イベント受信用のReceiverを1つ提供し、これらを通じてメッセージをやり取りするだけで操作できるようにする。
// 内部構造はドメイン駆動の考え方に基づき整理する。
// また、crate全体を通してステートレスな設計にし、将来Stateが必要になった場合もcontrol関数内のfoldのみが保持するよう設計する。
// ## Presentation層
// Presentation層の役割を果たすのは、
// 操作メッセージを与えるためのtokio::sync::mpsc::channel::Senderと
// イベントを受け取るためのtokio::sync::mpsc::channel::Receiverである
// 本crateの要件はこれだけで満たせるので、lib.rs内で生成して返すのみである。
// メッセージを受け取るためにNetworkやROSなどのAPI化を行うためにラッピングする
// crate利用者側の際コードが実質的にPresentation層の役割を果たす
//
// ### End-User Application <--> Presentation層間の通信
// 上記のSender, Receiverでメッセージのやり取りをすることで行う
// メッセージの実体はApplication層で定義されるDTOで、
// SenderにはServiceParams(及び操作の`一次的な結果`を受け取るためのtokio::sync::oneshot)が与えられ、
// ReceiverからはResponseMessageが返される。
// (SkyWay WebRTC Gatewayは、APIに対して処理依頼を行った際に、長時間の処理が必要な場合は値を即時返さない仕様になっている。
// その場合、まずは処理開始できるかどうかだけが返される。上記の`一次的な結果`とはこのメッセージを指す。
// 短時間で処理が完了するAPI callの場合は、`一次的な結果`のみで完結する。)
// これらの与えられた値がJSONのフォーマットを取っていることは、Rustの型システムによって保証されているので、
// Presentation層ではチェックを行わない。
// ## Application層
// src/application以下に配置する
// SkyWay WebRTC Gateawyの各APIに対応するUseCaseと、
// 受け取ったメッセージに対応する適切なUseCase objectを生成するcreatorを実装する。
// 各UseCaseは、application/usecase/service.rsで定義されるService traitに従い実装される。
//
// ### Presentation層 <--> Application層間の通信
// #### 操作指示:
// Presentation層として生成されるSenderからメッセージを受け取る。
// このメッセージはapplication::runに渡され、対応したUseCase objectが生成され、実行される
// End-Userから受け取ったJSONメッセージが各UseCaseに適合したものかどうかはApplication層でチェックし、
// 間違っている場合はErrorメッセージを返す。
// 正しい場合はパラメータを取り出し、Domain層に与える。
// #### イベント:
// イベント監視を開始すべきタイミングをEnd-Userが意識しなくてすむよう、lib.rs内で自動的にイベント監視を開始する。
// イベント監視はevent UseCaseの実行という形で行い、その戻り値はPresentation層のReceiverを通してEnd-Userに返される。
// ## Domain層
// SkyWay WebRTC Gatewayを操作するためのドメイン知識を実装する。
// SkyWay WebRTC GatewayのAPIは大きく/peer, /data, /mediaに分かれているので、
// それぞれのAPIに対応するコードを格納するディレクトリとしてdomain/peer, domain/data, domain/mediaがあり、
// これらの中で共通的に利用されるコードはdomain/commonに格納される。
// ドメイン知識として、SkyWay WebRTC Gatewayの各APIと、それらが利用する値のフォーマットを有する。
// 各APIの機能は、それぞれのディレクトリ内のservice.rs内でtraitとして定義する。
// Application層から与えられたパラメータのチェックは、Domain Objectに与えることでなされる。
//
// このチェックはskyway-webrtc-gateway-api crateで実装されているので、それを内部的に利用する。
// そのためDomain Objectの多くは、skyway-webrtc-gateway-api crate内で定義されている。
// skyway-webrtc-gateway-api crateに対する直接的な依存は、infra層を除けば、
// lib.rs内での初期化と、これらのDomain Objectのみである。
// (domain/*/value_object.rs内のみに留め、pub useする形で自身のobjectとして利用する)
//
// ### Application層 <--> Domain層間の通信
// 各UseCaseが、与えられたJSONメッセージからパラメータを取り出し、対応するDomain層のobjectに与える。
// ## Infra層
// skyway-webrtc-gateway-api crateに依存しており、APIを直接叩く。
//
// ### Domain層 <--> Infra層間の通信
// Domain層はDomain ObjectをInfra層の関数に与え、
// Infra層はskyway-webrtc-gateway-api crateのAPIから返される戻り値をDomain Objectに変換して返す。
use futures::stream::StreamExt;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream;
use crate::application::dto::response_message::ResponseResult;
use crate::presentation::serialize_service_params;
pub(crate) mod application;
pub(crate) mod di;
pub(crate) mod domain;
/// Error definition in this crate.
pub mod error;
pub(crate) mod infra;
/// A "prelude" for crates using this crate.
pub mod prelude;
pub(crate) mod presentation;
// Presentation層としてchannelを生成し、Application層以降のパイプラインを組み上げる関数。
// 外部から直接的に呼ばれるのはこの関数のみである。
//
// なお、Unit Testは行わずIntegration Testでのみテストを行う
/// Start WebRTC Gateway operation.
/// It provides a Sender to give JSON messages for operation and a Receiver to pass events.
pub async fn run(
base_url: &str,
) -> (
mpsc::Sender<(oneshot::Sender<String>, String)>,
mpsc::Receiver<String>,
) {
// skyway-webrtc-gateway crateにbase_urlを与え、初期化する
skyway_webrtc_gateway_api::initialize(base_url);
// End-Userに渡すSenderの生成
// End-UserはServiceParamsと、oneshotチャネルをこのSenderで与える。
// 本crateはServiceParamsに対応したUseCaseでの処理を開始し、`一次的な結果`をoneshotチャネルへ返す。
let (message_tx, message_rx) = mpsc::channel::<(oneshot::Sender<String>, String)>(10);
// End-Userに渡すReceiverの生成
// UseCaseでの処理の結果が`一次的な結果`に留まらず、副作用としてイベント監視の必要性が生じた場合は、
// このReceiverを介してイベントをEnd-Userに返す。
// TODO: タイムアウトの仕様を検討する
let (event_tx, event_rx) = mpsc::channel::<ResponseResult>(10);
// Senderの監視を開始する。
// 副作用としてイベントを返すケースのため、event_txも渡す
// (例: peer objectを生成したらpeer eventの監視を合わせて開始する)
tokio::spawn(skyway_control_service_observe(message_rx, event_tx));
// Presentation層の責務として、ObjectをJSONメッセージに変換して返す
let mut event_rx =
ReceiverStream::new(event_rx).map(|params| presentation::serialize_service_params(¶ms));
let (tx, rx) = mpsc::channel::<String>(1000);
tokio::spawn(async move {
while let Some(item) = event_rx.next().await {
if tx.send(item).await.is_err() {
break;
}
}
});
(message_tx, rx)
}
// End-Userからのメッセージ(ServiceParams)を監視し続ける
// これはEnd-UserがSenderが破棄するまで続ける。
// crate全体を通してステートレスに設計し、将来Stateが必要になった場合もこの関数内のfoldのみに留める
//
// なお、Unit Testは行わずIntegration Testでのみテストを行う
async fn skyway_control_service_observe(
receiver: mpsc::Receiver<(oneshot::Sender<String>, String)>,
event_tx: mpsc::Sender<ResponseResult>,
) {
// FIXME
// jsonをどんどん受け取る
let receiver = ReceiverStream::new(receiver);
receiver
.fold(
event_tx,
|event_tx, (message_response_tx, message)| async move {
// JSONをパースし、アプリケーション層に渡す
// このJSONは呼び出されるべきサービスの情報を含んでおり、アプリケーション層で適切に呼び出す
let result = presentation::format_input_json(&message).await;
// jsonのパースに失敗した場合はエラーを返す
if let Err(e) = result {
let message = ResponseResult::Error(format!(
r#"
{:?}
"#,
e
));
let _ = message_response_tx.send(serialize_service_params(&message));
return event_tx;
}
let message = result.unwrap();
let result = application::run(message).await;
// oneshot channelを介してサービス実行によって得られた `一次的な結果` を返す。
// サービスの実行結果がエラーの場合でも、エラーを示すJSONメッセージが返される(ResponseMessage::ERROR)のでそのままPresentation層へ渡す
let _ = message_response_tx.send(serialize_service_params(&result));
// イベントを監視する必要が生じた場合は、イベントの監視を開始する
// まずイベント監視する必要があるのは、サービス実行に成功したケースのみである
if let ResponseResult::Success(message) = result {
// event factoryに渡し、監視サービスが生成された場合
if let Some((value, service)) =
application::usecase::factory::event_factory(message)
{
// event_txをイベント監視スレッドにmoveし、監視を開始する
let tx = event_tx.clone();
tokio::spawn(async move {
service.execute(tx, value).await;
});
}
}
event_tx
},
)
.await;
}