cal_jambonz_actix_web/
lib.rs1mod handler;
2
3use actix_web::dev::Server;
4use actix_web::http::header::{HeaderName, HeaderValue};
5use actix_web::web::{Data, Payload};
6use actix_web::{App, Error, HttpRequest, HttpResponse, HttpServer, rt, web};
7use actix_ws::Session;
8use cal_jambonz::ws::WebsocketRequest;
9use std::pin::Pin;
10use std::sync::Arc;
11use uuid::Uuid;
12
13pub type HandlerFn<T> =
14 Arc<dyn Fn(HandlerContext<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
15
16pub fn register_handler<T, F, Fut>(handler: F) -> HandlerFn<T>
17where
18 T: Clone + Send + Sync + 'static,
19 F: Fn(HandlerContext<T>) -> Fut + Send + Sync + 'static,
20 Fut: Future<Output = ()> + Send + 'static,
21{
22 Arc::new(move |ctx: HandlerContext<T>| Box::pin(handler(ctx)))
23}
24
25async fn handle_record<T: 'static + Clone>(
26 req: HttpRequest,
27 stream: Payload,
28 state: Data<JambonzState<T>>,
29) -> Result<HttpResponse, Error> {
30 ws_response(&req, stream, state, "audio.jambonz.org")
31}
32
33async fn handle_ws<T: 'static + Clone>(
34 req: HttpRequest,
35 stream: Payload,
36 state: Data<JambonzState<T>>,
37) -> Result<HttpResponse, Error> {
38 ws_response(&req, stream, state, "ws.jambonz.org")
39}
40
41fn ws_response<T: 'static + Clone>(
42 req: &HttpRequest,
43 stream: Payload,
44 state: Data<JambonzState<T>>,
45 protocol: &str,
46) -> Result<HttpResponse, Error> {
47 match actix_ws::handle(req, stream) {
48 Ok(res) => {
49 let (mut res, session, msg_stream) = res;
50 rt::spawn(handler::handler(session, msg_stream, state));
51 let ws_header = HeaderName::from_bytes("Sec-WebSocket-Protocol".to_string().as_bytes())
52 .expect("should be valid WebSocket-Protocol");
53 res.headers_mut().insert(
54 ws_header.clone(),
55 HeaderValue::from_bytes(protocol.as_bytes())?,
56 );
57 Ok(res)
58 }
59 Err(e) => {
60 println!("{:?}", e);
61 Ok(HttpResponse::InternalServerError().finish())
62 }
63 }
64}
65
66pub fn start_jambonz_server<T>(server: JambonzWebServer<T>, handler: HandlerFn<T>) -> Server
67where
68 T: Clone + Send + 'static,
69{
70 let state = JambonzState {
71 message: "Hello".to_string(),
72 state: server.app_state.clone(),
73 handler: handler.clone(),
74 };
75
76 HttpServer::new(move || {
77 let d1 = Data::new(state.clone());
78
79 App::new()
80 .app_data(d1)
81 .route(
82 server.ws_path.clone().as_str(),
83 web::get().to(handle_ws::<T>),
84 )
85 .route(
86 server.record_path.clone().as_str(),
87 web::get().to(handle_record::<T>),
88 )
89 })
90 .bind((server.bind_ip, server.bind_port))
91 .expect("Can not bind to server/port")
92 .run()
93}
94
95#[derive(Clone)]
96pub struct JambonzState<T> {
97 pub message: String,
98 pub state: T,
99 pub handler: HandlerFn<T>,
100}
101
102pub enum JambonzRequest {
103 TextMessage(WebsocketRequest),
104 Binary(Vec<u8>),
105 Close,
106}
107
108pub struct JambonzWebServer<T> {
109 pub bind_ip: String,
110 pub bind_port: u16,
111 pub app_state: T,
112 pub ws_path: String,
113 pub record_path: String,
114}
115
116impl<T> JambonzWebServer<T> {
117 pub fn new(app_state: T) -> Self {
118 JambonzWebServer {
119 app_state,
120 bind_ip: "0.0.0.0".to_string(),
121 bind_port: 8080,
122 ws_path: "/ws".to_string(),
123 record_path: "/record".to_string(),
124 }
125 }
126
127 pub fn with_bind_ip(mut self, ip: impl Into<String>) -> Self {
128 self.bind_ip = ip.into();
129 self
130 }
131
132 pub fn with_bind_port(mut self, port: u16) -> Self {
133 self.bind_port = port;
134 self
135 }
136
137 pub fn with_ws_path(mut self, path: impl Into<String>) -> Self {
138 self.ws_path = path.into();
139 self
140 }
141
142 pub fn with_record_path(mut self, path: impl Into<String>) -> Self {
143 self.record_path = path.into();
144 self
145 }
146}
147
148pub struct HandlerContext<T> {
149 pub uuid: Uuid,
150 pub session: Session,
151 pub request: JambonzRequest,
152 pub state: T,
153}