Skip to main content

syncular_testkit/
http.rs

1use std::collections::{BTreeMap, VecDeque};
2use std::io::{BufRead, BufReader, Read, Write};
3use std::net::{SocketAddr, TcpListener, TcpStream};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::mpsc::{self, Sender};
6use std::sync::{Arc, Mutex};
7use std::thread::{self, JoinHandle};
8use std::time::{Duration, Instant};
9
10use serde::Deserialize;
11use serde_json::{json, Value};
12use syncular_runtime::error::Result;
13use syncular_runtime::protocol::{
14    AuthLeaseProvenance, CombinedRequest, CombinedResponse, PushBatchRequest, PushCommitRequest,
15    SyncOperation,
16};
17use syncular_runtime::transport::{SyncAuthHeaders, SyncTransport};
18use tungstenite::{http::StatusCode, Message};
19
20use crate::app_server::AppTestServer;
21
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct TestHttpRequest {
24    pub method: String,
25    pub path: String,
26    pub headers: BTreeMap<String, String>,
27    pub body: String,
28}
29
30impl TestHttpRequest {
31    pub fn json(&self) -> Option<Value> {
32        serde_json::from_str(&self.body).ok()
33    }
34
35    pub fn header(&self, name: &str) -> Option<&str> {
36        self.headers
37            .get(&name.to_ascii_lowercase())
38            .map(String::as_str)
39    }
40}
41
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub struct TestHttpResponse {
44    pub status: u16,
45    pub reason: String,
46    pub content_type: String,
47    pub body: String,
48}
49
50impl TestHttpResponse {
51    pub fn json(body: Value) -> Self {
52        Self {
53            status: 200,
54            reason: "OK".to_string(),
55            content_type: "application/json".to_string(),
56            body: body.to_string(),
57        }
58    }
59
60    pub fn status(status: u16, reason: impl Into<String>, body: impl Into<String>) -> Self {
61        Self {
62            status,
63            reason: reason.into(),
64            content_type: "text/plain".to_string(),
65            body: body.into(),
66        }
67    }
68
69    pub fn sync(response: CombinedResponse) -> Self {
70        Self::json(serde_json::to_value(response).expect("combined response JSON"))
71    }
72
73    pub fn auth_expired() -> Self {
74        Self::status(401, "Unauthorized", "expired token")
75    }
76}
77
78#[derive(Debug, Default)]
79struct TestHttpState {
80    requests: Vec<TestHttpRequest>,
81    responses: VecDeque<TestHttpResponse>,
82}
83
84pub struct TestSyncServer {
85    url: String,
86    stop: Arc<AtomicBool>,
87    state: Arc<Mutex<TestHttpState>>,
88    join: Option<JoinHandle<()>>,
89}
90
91type WsBroadcaster = Arc<Mutex<Vec<Sender<()>>>>;
92
93#[derive(Debug, Deserialize)]
94#[serde(rename_all = "camelCase")]
95struct TestWsPushMessage {
96    #[serde(rename = "type")]
97    message_type: String,
98    request_id: String,
99    client_commit_id: String,
100    operations: Vec<SyncOperation>,
101    schema_version: i32,
102    #[serde(default)]
103    auth_lease: Option<AuthLeaseProvenance>,
104}
105
106pub struct AppTestHttpServer {
107    addr: SocketAddr,
108    app_server: AppTestServer,
109    broadcaster: WsBroadcaster,
110    state: Arc<Mutex<TestHttpState>>,
111    stop: Arc<AtomicBool>,
112    join: Option<JoinHandle<()>>,
113}
114
115#[derive(Debug, Clone, PartialEq, Eq)]
116pub struct TestBlobHttpRequest {
117    pub method: String,
118    pub path: String,
119    pub headers: BTreeMap<String, String>,
120    pub body: Vec<u8>,
121}
122
123impl TestBlobHttpRequest {
124    pub fn header(&self, name: &str) -> Option<&str> {
125        self.headers
126            .get(&name.to_ascii_lowercase())
127            .map(String::as_str)
128    }
129}
130
131#[derive(Debug, Clone)]
132pub struct TestBlobServerOptions {
133    pub bytes: Vec<u8>,
134    pub hash: String,
135    pub upload_path: String,
136    pub download_path: String,
137    pub upload_token: String,
138}
139
140impl TestBlobServerOptions {
141    pub fn new(bytes: Vec<u8>, hash: impl Into<String>) -> Self {
142        Self {
143            bytes,
144            hash: hash.into(),
145            upload_path: "/upload-target".to_string(),
146            download_path: "/download-target".to_string(),
147            upload_token: "upload-token".to_string(),
148        }
149    }
150
151    pub fn upload_path(mut self, upload_path: impl Into<String>) -> Self {
152        self.upload_path = upload_path.into();
153        self
154    }
155
156    pub fn download_path(mut self, download_path: impl Into<String>) -> Self {
157        self.download_path = download_path.into();
158        self
159    }
160
161    pub fn upload_token(mut self, upload_token: impl Into<String>) -> Self {
162        self.upload_token = upload_token.into();
163        self
164    }
165}
166
167#[derive(Debug, Default)]
168struct TestBlobState {
169    requests: Vec<TestBlobHttpRequest>,
170}
171
172pub struct TestBlobServer {
173    addr: SocketAddr,
174    stop: Arc<AtomicBool>,
175    state: Arc<Mutex<TestBlobState>>,
176    join: Option<JoinHandle<()>>,
177}
178
179impl TestBlobServer {
180    pub fn start(bytes: Vec<u8>, hash: impl Into<String>) -> Result<Self> {
181        Self::start_with_options(TestBlobServerOptions::new(bytes, hash))
182    }
183
184    pub fn start_with_options(options: TestBlobServerOptions) -> Result<Self> {
185        let listener = TcpListener::bind("127.0.0.1:0")?;
186        listener.set_nonblocking(true)?;
187        let addr = listener.local_addr()?;
188        let stop = Arc::new(AtomicBool::new(false));
189        let state = Arc::new(Mutex::new(TestBlobState::default()));
190        let thread_stop = stop.clone();
191        let thread_state = state.clone();
192        let join = thread::spawn(move || {
193            while !thread_stop.load(Ordering::Relaxed) {
194                match listener.accept() {
195                    Ok((stream, _)) => {
196                        handle_blob_connection(stream, addr, &options, &thread_state);
197                    }
198                    Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
199                        thread::sleep(Duration::from_millis(5));
200                    }
201                    Err(_) => break,
202                }
203            }
204        });
205        Ok(Self {
206            addr,
207            stop,
208            state,
209            join: Some(join),
210        })
211    }
212
213    pub fn addr(&self) -> SocketAddr {
214        self.addr
215    }
216
217    pub fn sync_base_url(&self) -> String {
218        format!("http://{}/sync", self.addr)
219    }
220
221    pub fn requests(&self) -> Vec<TestBlobHttpRequest> {
222        self.state
223            .lock()
224            .expect("test blob server state")
225            .requests
226            .clone()
227    }
228
229    pub fn wait_for_requests(
230        &self,
231        expected: usize,
232        timeout: Duration,
233    ) -> Vec<TestBlobHttpRequest> {
234        let deadline = Instant::now() + timeout;
235        loop {
236            let requests = self.requests();
237            if requests.len() >= expected || Instant::now() >= deadline {
238                return requests;
239            }
240            thread::sleep(Duration::from_millis(5));
241        }
242    }
243}
244
245impl Drop for TestBlobServer {
246    fn drop(&mut self) {
247        self.stop.store(true, Ordering::Relaxed);
248        let _ = TcpStream::connect(self.addr);
249        if let Some(join) = self.join.take() {
250            let _ = join.join();
251        }
252    }
253}
254
255impl TestSyncServer {
256    pub fn spawn(responses: impl IntoIterator<Item = TestHttpResponse>) -> Result<Self> {
257        let listener = TcpListener::bind("127.0.0.1:0")?;
258        listener.set_nonblocking(true)?;
259        let addr = listener.local_addr()?;
260        let stop = Arc::new(AtomicBool::new(false));
261        let state = Arc::new(Mutex::new(TestHttpState {
262            requests: Vec::new(),
263            responses: responses.into_iter().collect(),
264        }));
265        let thread_stop = stop.clone();
266        let thread_state = state.clone();
267        let join = thread::spawn(move || {
268            while !thread_stop.load(Ordering::Relaxed) {
269                match listener.accept() {
270                    Ok((mut stream, _)) => handle_connection(&mut stream, &thread_state),
271                    Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
272                        thread::sleep(Duration::from_millis(5));
273                    }
274                    Err(_) => break,
275                }
276            }
277        });
278        Ok(Self {
279            url: format!("http://{addr}/sync"),
280            stop,
281            state,
282            join: Some(join),
283        })
284    }
285
286    pub fn empty_success() -> Result<Self> {
287        Self::spawn([empty_success_response()])
288    }
289
290    pub fn sync_responses(responses: impl IntoIterator<Item = CombinedResponse>) -> Result<Self> {
291        Self::spawn(responses.into_iter().map(TestHttpResponse::sync))
292    }
293
294    pub fn status(status: u16, reason: impl Into<String>, body: impl Into<String>) -> Result<Self> {
295        Self::spawn([TestHttpResponse::status(status, reason, body)])
296    }
297
298    pub fn url(&self) -> String {
299        self.url.clone()
300    }
301
302    pub fn requests(&self) -> Vec<TestHttpRequest> {
303        self.state
304            .lock()
305            .expect("test http server state")
306            .requests
307            .clone()
308    }
309
310    pub fn request_jsons(&self) -> Vec<Value> {
311        self.requests()
312            .into_iter()
313            .filter_map(|request| request.json())
314            .collect()
315    }
316
317    pub fn wait_for_requests(&self, expected: usize, timeout: Duration) -> Vec<TestHttpRequest> {
318        let deadline = Instant::now() + timeout;
319        loop {
320            let requests = self.requests();
321            if requests.len() >= expected || Instant::now() >= deadline {
322                return requests;
323            }
324            thread::sleep(Duration::from_millis(5));
325        }
326    }
327
328    pub fn push_response(&self, response: TestHttpResponse) {
329        self.state
330            .lock()
331            .expect("test http server state")
332            .responses
333            .push_back(response);
334    }
335
336    pub fn push_sync_response(&self, response: CombinedResponse) {
337        self.push_response(TestHttpResponse::sync(response));
338    }
339
340    pub fn push_json_response(&self, body: Value) {
341        self.push_response(TestHttpResponse::json(body));
342    }
343}
344
345impl Drop for TestSyncServer {
346    fn drop(&mut self) {
347        self.stop.store(true, Ordering::Relaxed);
348        let _ = TcpStream::connect(
349            self.url
350                .trim_start_matches("http://")
351                .trim_end_matches("/sync"),
352        );
353        if let Some(join) = self.join.take() {
354            let _ = join.join();
355        }
356    }
357}
358
359impl AppTestHttpServer {
360    pub fn start(app_schema: syncular_runtime::app_schema::AppSchema) -> Result<Self> {
361        Self::start_with_server(AppTestServer::new(app_schema))
362    }
363
364    pub fn start_with_server(app_server: AppTestServer) -> Result<Self> {
365        let listener = TcpListener::bind("127.0.0.1:0")?;
366        listener.set_nonblocking(true)?;
367        let addr = listener.local_addr()?;
368        let broadcaster: WsBroadcaster = Arc::new(Mutex::new(Vec::new()));
369        let state = Arc::new(Mutex::new(TestHttpState::default()));
370        let stop = Arc::new(AtomicBool::new(false));
371        let thread_stop = stop.clone();
372        let thread_server = app_server.clone();
373        let thread_broadcaster = broadcaster.clone();
374        let thread_state = state.clone();
375        let join = thread::spawn(move || {
376            while !thread_stop.load(Ordering::Relaxed) {
377                match listener.accept() {
378                    Ok((stream, _)) => {
379                        if thread_stop.load(Ordering::Relaxed) {
380                            break;
381                        }
382                        let _ = stream.set_nonblocking(false);
383                        let connection_server = thread_server.clone();
384                        let connection_broadcaster = thread_broadcaster.clone();
385                        let connection_stop = thread_stop.clone();
386                        let connection_state = thread_state.clone();
387                        thread::spawn(move || {
388                            handle_app_test_http_connection(
389                                stream,
390                                &connection_server,
391                                &connection_broadcaster,
392                                &connection_state,
393                                &connection_stop,
394                            );
395                        });
396                    }
397                    Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
398                        thread::sleep(Duration::from_millis(5));
399                    }
400                    Err(_) => break,
401                }
402            }
403        });
404
405        Ok(Self {
406            addr,
407            app_server,
408            broadcaster,
409            state,
410            stop,
411            join: Some(join),
412        })
413    }
414
415    pub fn addr(&self) -> SocketAddr {
416        self.addr
417    }
418
419    pub fn url(&self) -> String {
420        format!("http://{}/sync", self.addr)
421    }
422
423    pub fn realtime_url(&self, client_id: &str) -> String {
424        format!(
425            "ws://{}/sync/realtime?clientId={}",
426            self.addr,
427            client_id.replace(' ', "%20")
428        )
429    }
430
431    pub fn app_server(&self) -> &AppTestServer {
432        &self.app_server
433    }
434
435    pub fn requests(&self) -> Vec<TestHttpRequest> {
436        self.state
437            .lock()
438            .expect("app test HTTP server state")
439            .requests
440            .clone()
441    }
442
443    pub fn wait_for_requests(&self, expected: usize, timeout: Duration) -> Vec<TestHttpRequest> {
444        let deadline = Instant::now() + timeout;
445        loop {
446            let requests = self.requests();
447            if requests.len() >= expected || Instant::now() >= deadline {
448                return requests;
449            }
450            thread::sleep(Duration::from_millis(5));
451        }
452    }
453
454    pub fn push_realtime_sync(&self) {
455        broadcast_realtime_sync(&self.broadcaster);
456    }
457}
458
459impl Drop for AppTestHttpServer {
460    fn drop(&mut self) {
461        let _ = self.broadcaster.lock().map(|peers| peers.len());
462        self.stop.store(true, Ordering::Relaxed);
463        let _ = TcpStream::connect(self.addr);
464        if let Some(join) = self.join.take() {
465            let _ = join.join();
466        }
467    }
468}
469
470pub fn empty_success_response() -> TestHttpResponse {
471    TestHttpResponse::json(json!({
472        "ok": true,
473        "push": null,
474        "pull": {
475            "ok": true,
476            "subscriptions": []
477        }
478    }))
479}
480
481pub fn encoded_blob_hash(hash: &str) -> String {
482    hash.replace(':', "%3A")
483}
484
485fn handle_app_test_http_connection(
486    mut stream: TcpStream,
487    server: &AppTestServer,
488    broadcaster: &WsBroadcaster,
489    state: &Arc<Mutex<TestHttpState>>,
490    stop: &Arc<AtomicBool>,
491) {
492    if is_websocket_request(&stream) {
493        handle_app_test_ws_connection(stream, server, broadcaster, state, stop);
494        return;
495    }
496
497    let request = read_http_request(&mut stream);
498    if request.body.is_empty() && request.method.is_empty() {
499        return;
500    }
501    record_app_test_http_request(state, request.clone());
502    if request.method != "POST" || request.path != "/sync" {
503        write_http_response(
504            &mut stream,
505            TestHttpResponse::status(404, "Not Found", "not found"),
506        );
507        return;
508    }
509    let auth_headers = auth_headers_from_request(&request);
510    server.record_auth_headers(auth_headers.clone());
511    if !server.is_authorized_headers(&auth_headers) {
512        write_http_response(
513            &mut stream,
514            TestHttpResponse::status(401, "Unauthorized", "unauthorized"),
515        );
516        return;
517    }
518
519    let response = (|| {
520        let request = serde_json::from_str::<CombinedRequest>(&request.body)?;
521        let before_commits = server.commits().len();
522        let response = server.post_sync(&request)?;
523        let broadcast = server.commits().len() > before_commits;
524        let body = serde_json::to_string(&response)?;
525        Ok::<_, syncular_runtime::error::SyncularError>((body, broadcast))
526    })();
527
528    match response {
529        Ok((body, broadcast)) => {
530            write_http_response(
531                &mut stream,
532                TestHttpResponse {
533                    status: 200,
534                    reason: "OK".to_string(),
535                    content_type: "application/json".to_string(),
536                    body,
537                },
538            );
539            if broadcast {
540                broadcast_realtime_sync(broadcaster);
541            }
542        }
543        Err(error) => {
544            write_http_response(
545                &mut stream,
546                TestHttpResponse {
547                    status: 500,
548                    reason: "Internal Server Error".to_string(),
549                    content_type: "application/json".to_string(),
550                    body: json!({ "error": error.to_string() }).to_string(),
551                },
552            );
553        }
554    }
555}
556
557fn is_websocket_request(stream: &TcpStream) -> bool {
558    let mut buffer = [0u8; 4];
559    match stream.peek(&mut buffer) {
560        Ok(read) => read >= 3 && buffer[..read].starts_with(b"GET"),
561        Err(_) => false,
562    }
563}
564
565fn handle_app_test_ws_connection(
566    stream: TcpStream,
567    server: &AppTestServer,
568    broadcaster: &WsBroadcaster,
569    state: &Arc<Mutex<TestHttpState>>,
570    stop: &Arc<AtomicBool>,
571) {
572    let mut client_id = String::new();
573    let mut socket = match tungstenite::accept_hdr(
574        stream,
575        |request: &tungstenite::handshake::server::Request, response| {
576            client_id = query_param(request.uri().query().unwrap_or_default(), "clientId")
577                .unwrap_or_else(|| "app-test-ws-client".to_string());
578            record_app_test_http_request(state, websocket_request_record(request));
579            let auth_headers = auth_headers_from_ws_request(request);
580            server.record_auth_headers(auth_headers.clone());
581            if !server.is_authorized_headers(&auth_headers) {
582                return Err(tungstenite::handshake::server::Response::builder()
583                    .status(StatusCode::UNAUTHORIZED)
584                    .body(Some("unauthorized".to_string()))
585                    .expect("unauthorized websocket response"));
586            }
587            Ok(response)
588        },
589    ) {
590        Ok(socket) => socket,
591        Err(_) => return,
592    };
593    set_ws_stream_read_timeout(&mut socket, Duration::from_millis(50));
594    let (tx, rx) = mpsc::channel::<()>();
595    broadcaster
596        .lock()
597        .expect("app test ws broadcaster")
598        .push(tx);
599
600    while !stop.load(Ordering::Relaxed) {
601        while rx.try_recv().is_ok() {
602            if socket
603                .send(Message::Text(json!({ "event": "sync" }).to_string().into()))
604                .is_err()
605            {
606                let _ = socket.close(None);
607                return;
608            }
609        }
610        let message = match socket.read() {
611            Ok(Message::Text(text)) => text.to_string(),
612            Ok(Message::Ping(bytes)) => {
613                let _ = socket.send(Message::Pong(bytes));
614                continue;
615            }
616            Ok(Message::Close(_)) => break,
617            Ok(_) => continue,
618            Err(tungstenite::Error::Io(error))
619                if error.kind() == std::io::ErrorKind::WouldBlock
620                    || error.kind() == std::io::ErrorKind::TimedOut =>
621            {
622                continue;
623            }
624            Err(_) => break,
625        };
626
627        let response = handle_ws_push_message(server, &client_id, &message);
628        match response {
629            Ok(response) => {
630                if socket
631                    .send(Message::Text(response.to_string().into()))
632                    .is_err()
633                {
634                    break;
635                }
636                broadcast_realtime_sync(broadcaster);
637            }
638            Err(error) => {
639                let _ = socket.send(Message::Text(
640                    json!({
641                        "event": "error",
642                        "data": { "message": error.to_string() }
643                    })
644                    .to_string()
645                    .into(),
646                ));
647                break;
648            }
649        }
650    }
651
652    let _ = socket.close(None);
653}
654
655fn record_app_test_http_request(state: &Arc<Mutex<TestHttpState>>, request: TestHttpRequest) {
656    state
657        .lock()
658        .expect("app test HTTP server state")
659        .requests
660        .push(request);
661}
662
663fn websocket_request_record(request: &tungstenite::handshake::server::Request) -> TestHttpRequest {
664    TestHttpRequest {
665        method: request.method().to_string(),
666        path: request
667            .uri()
668            .path_and_query()
669            .map(|path| path.as_str().to_string())
670            .unwrap_or_else(|| request.uri().path().to_string()),
671        headers: request
672            .headers()
673            .iter()
674            .filter_map(|(name, value)| {
675                Some((
676                    name.as_str().to_ascii_lowercase(),
677                    value.to_str().ok()?.to_string(),
678                ))
679            })
680            .collect(),
681        body: String::new(),
682    }
683}
684
685fn auth_headers_from_request(request: &TestHttpRequest) -> SyncAuthHeaders {
686    request
687        .headers
688        .iter()
689        .filter_map(|(name, value)| auth_header_entry(name, value))
690        .collect()
691}
692
693fn auth_headers_from_ws_request(
694    request: &tungstenite::handshake::server::Request,
695) -> SyncAuthHeaders {
696    request
697        .headers()
698        .iter()
699        .filter_map(|(name, value)| auth_header_entry(name.as_str(), value.to_str().ok()?))
700        .collect()
701}
702
703fn auth_header_entry(name: &str, value: &str) -> Option<(String, String)> {
704    let name = name.to_ascii_lowercase();
705    if name == "authorization"
706        || (name.starts_with("x-syncular-") && name != "x-syncular-schema-version")
707    {
708        Some((name, value.to_string()))
709    } else {
710        None
711    }
712}
713
714fn handle_ws_push_message(server: &AppTestServer, client_id: &str, message: &str) -> Result<Value> {
715    let message = serde_json::from_str::<TestWsPushMessage>(message)?;
716    if message.message_type != "push" {
717        return Err(syncular_runtime::error::SyncularError::protocol_message(
718            format!(
719                "unsupported websocket message type: {}",
720                message.message_type
721            ),
722        ));
723    }
724    let request_id = message.request_id;
725    let request = CombinedRequest {
726        client_id: client_id.to_string(),
727        push: Some(PushBatchRequest {
728            commits: vec![PushCommitRequest {
729                client_commit_id: message.client_commit_id,
730                operations: message.operations,
731                schema_version: message.schema_version,
732                auth_lease: message.auth_lease,
733            }],
734        }),
735        pull: None,
736    };
737    let combined = server.post_sync(&request)?;
738    let response = combined
739        .push
740        .and_then(|push| push.commits.into_iter().next())
741        .ok_or_else(|| {
742            syncular_runtime::error::SyncularError::protocol_message(
743                "missing websocket push response",
744            )
745        })?;
746    Ok(json!({
747        "event": "push-response",
748        "data": {
749            "requestId": request_id,
750            "clientCommitId": response.client_commit_id,
751            "status": response.status,
752            "commitSeq": response.commit_seq,
753            "results": response.results,
754        }
755    }))
756}
757
758fn broadcast_realtime_sync(broadcaster: &WsBroadcaster) {
759    let mut peers = broadcaster.lock().expect("app test ws broadcaster");
760    peers.retain(|sender| sender.send(()).is_ok());
761}
762
763fn set_ws_stream_read_timeout(socket: &mut tungstenite::WebSocket<TcpStream>, timeout: Duration) {
764    let _ = socket.get_mut().set_read_timeout(Some(timeout));
765}
766
767fn query_param(query: &str, name: &str) -> Option<String> {
768    query.split('&').find_map(|pair| {
769        let (key, value) = pair.split_once('=')?;
770        if key == name {
771            Some(value.replace("%20", " "))
772        } else {
773            None
774        }
775    })
776}
777
778fn handle_blob_connection(
779    stream: TcpStream,
780    addr: SocketAddr,
781    options: &TestBlobServerOptions,
782    state: &Arc<Mutex<TestBlobState>>,
783) {
784    let Ok((request, mut stream)) = read_blob_http_request(stream) else {
785        return;
786    };
787    let response = blob_response_for(&request, addr, options);
788    let _ = stream.write_all(response.as_slice());
789    state
790        .lock()
791        .expect("test blob server state")
792        .requests
793        .push(request);
794}
795
796fn read_blob_http_request(stream: TcpStream) -> std::io::Result<(TestBlobHttpRequest, TcpStream)> {
797    let mut reader = BufReader::new(stream);
798    let mut request_line = String::new();
799    reader.read_line(&mut request_line)?;
800    let mut request_parts = request_line.split_whitespace();
801    let method = request_parts.next().unwrap_or_default().to_string();
802    let path = request_parts.next().unwrap_or("/").to_string();
803    let mut headers = BTreeMap::new();
804    loop {
805        let mut line = String::new();
806        reader.read_line(&mut line)?;
807        if line == "\r\n" || line.is_empty() {
808            break;
809        }
810        if let Some((name, value)) = line.split_once(':') {
811            headers.insert(name.trim().to_ascii_lowercase(), value.trim().to_string());
812        }
813    }
814    let content_length = headers
815        .get("content-length")
816        .and_then(|value| value.parse::<usize>().ok())
817        .unwrap_or(0);
818    let body = if headers
819        .get("transfer-encoding")
820        .is_some_and(|value| value.to_ascii_lowercase().contains("chunked"))
821    {
822        read_chunked_body(&mut reader)?
823    } else {
824        let mut body = vec![0u8; content_length];
825        reader.read_exact(&mut body)?;
826        body
827    };
828    let stream = reader.into_inner();
829    Ok((
830        TestBlobHttpRequest {
831            method,
832            path,
833            headers,
834            body,
835        },
836        stream,
837    ))
838}
839
840fn read_chunked_body(reader: &mut BufReader<TcpStream>) -> std::io::Result<Vec<u8>> {
841    let mut body = Vec::new();
842    loop {
843        let mut size_line = String::new();
844        reader.read_line(&mut size_line)?;
845        let size_text = size_line
846            .trim()
847            .split_once(';')
848            .map(|(size, _)| size)
849            .unwrap_or_else(|| size_line.trim());
850        let size = usize::from_str_radix(size_text, 16).unwrap_or(0);
851        if size == 0 {
852            loop {
853                let mut trailer = String::new();
854                reader.read_line(&mut trailer)?;
855                if trailer == "\r\n" || trailer.is_empty() {
856                    return Ok(body);
857                }
858            }
859        }
860        let mut chunk = vec![0u8; size];
861        reader.read_exact(&mut chunk)?;
862        body.extend(chunk);
863        let mut crlf = [0u8; 2];
864        reader.read_exact(&mut crlf)?;
865    }
866}
867
868fn blob_response_for(
869    request: &TestBlobHttpRequest,
870    addr: SocketAddr,
871    options: &TestBlobServerOptions,
872) -> Vec<u8> {
873    let encoded_hash = encoded_blob_hash(&options.hash);
874    let complete_path = format!("/sync/blobs/{encoded_hash}/complete");
875    let signed_url_path = format!("/sync/blobs/{encoded_hash}/url");
876    let body = match request.path.as_str() {
877        "/sync/blobs/upload" => format!(
878            r#"{{"exists":false,"uploadUrl":"http://{addr}{}","uploadMethod":"PUT","uploadHeaders":{{"x-upload-token":"{}"}}}}"#,
879            options.upload_path, options.upload_token
880        )
881        .into_bytes(),
882        path if path == options.upload_path.as_str() => b"OK".to_vec(),
883        path if path == complete_path.as_str() => br#"{"ok":true}"#.to_vec(),
884        path if path == signed_url_path.as_str() => format!(
885            r#"{{"url":"http://{addr}{}","expiresAt":"2099-01-01T00:00:00.000Z"}}"#,
886            options.download_path
887        )
888        .into_bytes(),
889        path if path == options.download_path.as_str() => options.bytes.clone(),
890        _ => br#"{"error":"NOT_FOUND"}"#.to_vec(),
891    };
892    let status = if request.path == options.download_path
893        || request.path == options.upload_path
894        || request.path.starts_with("/sync/blobs/")
895    {
896        "200 OK"
897    } else {
898        "404 Not Found"
899    };
900    let content_type = if request.path == options.download_path {
901        "application/octet-stream"
902    } else {
903        "application/json"
904    };
905    let head = format!(
906        "HTTP/1.1 {status}\r\ncontent-type: {content_type}\r\ncontent-length: {}\r\nconnection: close\r\n\r\n",
907        body.len()
908    );
909    let mut response = head.into_bytes();
910    response.extend(body);
911    response
912}
913
914fn handle_connection(stream: &mut TcpStream, state: &Arc<Mutex<TestHttpState>>) {
915    let request = read_http_request(stream);
916    if request.body.is_empty() && request.method.is_empty() {
917        return;
918    }
919    let response = {
920        let mut state = state.lock().expect("test http server state");
921        state.requests.push(request);
922        state
923            .responses
924            .pop_front()
925            .unwrap_or_else(empty_success_response)
926    };
927    write_http_response(stream, response);
928}
929
930fn read_http_request(stream: &mut TcpStream) -> TestHttpRequest {
931    let _ = stream.set_read_timeout(Some(Duration::from_secs(2)));
932    let mut buffer = Vec::new();
933    let mut chunk = [0u8; 8192];
934    loop {
935        let Ok(read) = stream.read(&mut chunk) else {
936            break;
937        };
938        if read == 0 {
939            break;
940        }
941        buffer.extend_from_slice(&chunk[..read]);
942        if request_body_complete(&buffer) {
943            break;
944        }
945    }
946    parse_http_request(&String::from_utf8_lossy(&buffer))
947}
948
949fn request_body_complete(buffer: &[u8]) -> bool {
950    let text = String::from_utf8_lossy(buffer);
951    let Some((headers, body)) = text.split_once("\r\n\r\n") else {
952        return false;
953    };
954    let content_length = headers
955        .lines()
956        .find_map(|line| {
957            let (name, value) = line.split_once(':')?;
958            if name.eq_ignore_ascii_case("content-length") {
959                value.trim().parse::<usize>().ok()
960            } else {
961                None
962            }
963        })
964        .unwrap_or(0);
965    body.as_bytes().len() >= content_length
966}
967
968fn parse_http_request(raw: &str) -> TestHttpRequest {
969    let (head, body) = raw.split_once("\r\n\r\n").unwrap_or((raw, ""));
970    let mut lines = head.lines();
971    let request_line = lines.next().unwrap_or_default();
972    let mut request_parts = request_line.split_whitespace();
973    let method = request_parts.next().unwrap_or_default().to_string();
974    let path = request_parts.next().unwrap_or_default().to_string();
975    let headers = lines
976        .filter_map(|line| {
977            let (name, value) = line.split_once(':')?;
978            Some((name.trim().to_ascii_lowercase(), value.trim().to_string()))
979        })
980        .collect();
981    TestHttpRequest {
982        method,
983        path,
984        headers,
985        body: body.to_string(),
986    }
987}
988
989fn write_http_response(stream: &mut TcpStream, response: TestHttpResponse) {
990    let body = response.body;
991    let message = format!(
992        "HTTP/1.1 {} {}\r\ncontent-type: {}\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}",
993        response.status,
994        response.reason,
995        response.content_type,
996        body.len(),
997        body
998    );
999    let _ = stream.write_all(message.as_bytes());
1000}