1use std::collections::{BTreeMap, VecDeque};
2use std::io::{BufRead, BufReader, Read, Write};
3use std::net::{SocketAddr, TcpListener, TcpStream};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::mpsc::{self, Sender};
6use std::sync::{Arc, Mutex};
7use std::thread::{self, JoinHandle};
8use std::time::{Duration, Instant};
9
10use serde::Deserialize;
11use serde_json::{json, Value};
12use syncular_runtime::error::Result;
13use syncular_runtime::protocol::{
14 AuthLeaseProvenance, CombinedRequest, CombinedResponse, PushBatchRequest, PushCommitRequest,
15 SyncOperation,
16};
17use syncular_runtime::transport::{SyncAuthHeaders, SyncTransport};
18use tungstenite::{http::StatusCode, Message};
19
20use crate::app_server::AppTestServer;
21
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct TestHttpRequest {
24 pub method: String,
25 pub path: String,
26 pub headers: BTreeMap<String, String>,
27 pub body: String,
28}
29
30impl TestHttpRequest {
31 pub fn json(&self) -> Option<Value> {
32 serde_json::from_str(&self.body).ok()
33 }
34
35 pub fn header(&self, name: &str) -> Option<&str> {
36 self.headers
37 .get(&name.to_ascii_lowercase())
38 .map(String::as_str)
39 }
40}
41
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub struct TestHttpResponse {
44 pub status: u16,
45 pub reason: String,
46 pub content_type: String,
47 pub body: String,
48}
49
50impl TestHttpResponse {
51 pub fn json(body: Value) -> Self {
52 Self {
53 status: 200,
54 reason: "OK".to_string(),
55 content_type: "application/json".to_string(),
56 body: body.to_string(),
57 }
58 }
59
60 pub fn status(status: u16, reason: impl Into<String>, body: impl Into<String>) -> Self {
61 Self {
62 status,
63 reason: reason.into(),
64 content_type: "text/plain".to_string(),
65 body: body.into(),
66 }
67 }
68
69 pub fn sync(response: CombinedResponse) -> Self {
70 Self::json(serde_json::to_value(response).expect("combined response JSON"))
71 }
72
73 pub fn auth_expired() -> Self {
74 Self::status(401, "Unauthorized", "expired token")
75 }
76}
77
78#[derive(Debug, Default)]
79struct TestHttpState {
80 requests: Vec<TestHttpRequest>,
81 responses: VecDeque<TestHttpResponse>,
82}
83
84pub struct TestSyncServer {
85 url: String,
86 stop: Arc<AtomicBool>,
87 state: Arc<Mutex<TestHttpState>>,
88 join: Option<JoinHandle<()>>,
89}
90
91type WsBroadcaster = Arc<Mutex<Vec<Sender<()>>>>;
92
93#[derive(Debug, Deserialize)]
94#[serde(rename_all = "camelCase")]
95struct TestWsPushMessage {
96 #[serde(rename = "type")]
97 message_type: String,
98 request_id: String,
99 client_commit_id: String,
100 operations: Vec<SyncOperation>,
101 schema_version: i32,
102 #[serde(default)]
103 auth_lease: Option<AuthLeaseProvenance>,
104}
105
106pub struct AppTestHttpServer {
107 addr: SocketAddr,
108 app_server: AppTestServer,
109 broadcaster: WsBroadcaster,
110 state: Arc<Mutex<TestHttpState>>,
111 stop: Arc<AtomicBool>,
112 join: Option<JoinHandle<()>>,
113}
114
115#[derive(Debug, Clone, PartialEq, Eq)]
116pub struct TestBlobHttpRequest {
117 pub method: String,
118 pub path: String,
119 pub headers: BTreeMap<String, String>,
120 pub body: Vec<u8>,
121}
122
123impl TestBlobHttpRequest {
124 pub fn header(&self, name: &str) -> Option<&str> {
125 self.headers
126 .get(&name.to_ascii_lowercase())
127 .map(String::as_str)
128 }
129}
130
131#[derive(Debug, Clone)]
132pub struct TestBlobServerOptions {
133 pub bytes: Vec<u8>,
134 pub hash: String,
135 pub upload_path: String,
136 pub download_path: String,
137 pub upload_token: String,
138}
139
140impl TestBlobServerOptions {
141 pub fn new(bytes: Vec<u8>, hash: impl Into<String>) -> Self {
142 Self {
143 bytes,
144 hash: hash.into(),
145 upload_path: "/upload-target".to_string(),
146 download_path: "/download-target".to_string(),
147 upload_token: "upload-token".to_string(),
148 }
149 }
150
151 pub fn upload_path(mut self, upload_path: impl Into<String>) -> Self {
152 self.upload_path = upload_path.into();
153 self
154 }
155
156 pub fn download_path(mut self, download_path: impl Into<String>) -> Self {
157 self.download_path = download_path.into();
158 self
159 }
160
161 pub fn upload_token(mut self, upload_token: impl Into<String>) -> Self {
162 self.upload_token = upload_token.into();
163 self
164 }
165}
166
167#[derive(Debug, Default)]
168struct TestBlobState {
169 requests: Vec<TestBlobHttpRequest>,
170}
171
172pub struct TestBlobServer {
173 addr: SocketAddr,
174 stop: Arc<AtomicBool>,
175 state: Arc<Mutex<TestBlobState>>,
176 join: Option<JoinHandle<()>>,
177}
178
179impl TestBlobServer {
180 pub fn start(bytes: Vec<u8>, hash: impl Into<String>) -> Result<Self> {
181 Self::start_with_options(TestBlobServerOptions::new(bytes, hash))
182 }
183
184 pub fn start_with_options(options: TestBlobServerOptions) -> Result<Self> {
185 let listener = TcpListener::bind("127.0.0.1:0")?;
186 listener.set_nonblocking(true)?;
187 let addr = listener.local_addr()?;
188 let stop = Arc::new(AtomicBool::new(false));
189 let state = Arc::new(Mutex::new(TestBlobState::default()));
190 let thread_stop = stop.clone();
191 let thread_state = state.clone();
192 let join = thread::spawn(move || {
193 while !thread_stop.load(Ordering::Relaxed) {
194 match listener.accept() {
195 Ok((stream, _)) => {
196 handle_blob_connection(stream, addr, &options, &thread_state);
197 }
198 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
199 thread::sleep(Duration::from_millis(5));
200 }
201 Err(_) => break,
202 }
203 }
204 });
205 Ok(Self {
206 addr,
207 stop,
208 state,
209 join: Some(join),
210 })
211 }
212
213 pub fn addr(&self) -> SocketAddr {
214 self.addr
215 }
216
217 pub fn sync_base_url(&self) -> String {
218 format!("http://{}/sync", self.addr)
219 }
220
221 pub fn requests(&self) -> Vec<TestBlobHttpRequest> {
222 self.state
223 .lock()
224 .expect("test blob server state")
225 .requests
226 .clone()
227 }
228
229 pub fn wait_for_requests(
230 &self,
231 expected: usize,
232 timeout: Duration,
233 ) -> Vec<TestBlobHttpRequest> {
234 let deadline = Instant::now() + timeout;
235 loop {
236 let requests = self.requests();
237 if requests.len() >= expected || Instant::now() >= deadline {
238 return requests;
239 }
240 thread::sleep(Duration::from_millis(5));
241 }
242 }
243}
244
245impl Drop for TestBlobServer {
246 fn drop(&mut self) {
247 self.stop.store(true, Ordering::Relaxed);
248 let _ = TcpStream::connect(self.addr);
249 if let Some(join) = self.join.take() {
250 let _ = join.join();
251 }
252 }
253}
254
255impl TestSyncServer {
256 pub fn spawn(responses: impl IntoIterator<Item = TestHttpResponse>) -> Result<Self> {
257 let listener = TcpListener::bind("127.0.0.1:0")?;
258 listener.set_nonblocking(true)?;
259 let addr = listener.local_addr()?;
260 let stop = Arc::new(AtomicBool::new(false));
261 let state = Arc::new(Mutex::new(TestHttpState {
262 requests: Vec::new(),
263 responses: responses.into_iter().collect(),
264 }));
265 let thread_stop = stop.clone();
266 let thread_state = state.clone();
267 let join = thread::spawn(move || {
268 while !thread_stop.load(Ordering::Relaxed) {
269 match listener.accept() {
270 Ok((mut stream, _)) => handle_connection(&mut stream, &thread_state),
271 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
272 thread::sleep(Duration::from_millis(5));
273 }
274 Err(_) => break,
275 }
276 }
277 });
278 Ok(Self {
279 url: format!("http://{addr}/sync"),
280 stop,
281 state,
282 join: Some(join),
283 })
284 }
285
286 pub fn empty_success() -> Result<Self> {
287 Self::spawn([empty_success_response()])
288 }
289
290 pub fn sync_responses(responses: impl IntoIterator<Item = CombinedResponse>) -> Result<Self> {
291 Self::spawn(responses.into_iter().map(TestHttpResponse::sync))
292 }
293
294 pub fn status(status: u16, reason: impl Into<String>, body: impl Into<String>) -> Result<Self> {
295 Self::spawn([TestHttpResponse::status(status, reason, body)])
296 }
297
298 pub fn url(&self) -> String {
299 self.url.clone()
300 }
301
302 pub fn requests(&self) -> Vec<TestHttpRequest> {
303 self.state
304 .lock()
305 .expect("test http server state")
306 .requests
307 .clone()
308 }
309
310 pub fn request_jsons(&self) -> Vec<Value> {
311 self.requests()
312 .into_iter()
313 .filter_map(|request| request.json())
314 .collect()
315 }
316
317 pub fn wait_for_requests(&self, expected: usize, timeout: Duration) -> Vec<TestHttpRequest> {
318 let deadline = Instant::now() + timeout;
319 loop {
320 let requests = self.requests();
321 if requests.len() >= expected || Instant::now() >= deadline {
322 return requests;
323 }
324 thread::sleep(Duration::from_millis(5));
325 }
326 }
327
328 pub fn push_response(&self, response: TestHttpResponse) {
329 self.state
330 .lock()
331 .expect("test http server state")
332 .responses
333 .push_back(response);
334 }
335
336 pub fn push_sync_response(&self, response: CombinedResponse) {
337 self.push_response(TestHttpResponse::sync(response));
338 }
339
340 pub fn push_json_response(&self, body: Value) {
341 self.push_response(TestHttpResponse::json(body));
342 }
343}
344
345impl Drop for TestSyncServer {
346 fn drop(&mut self) {
347 self.stop.store(true, Ordering::Relaxed);
348 let _ = TcpStream::connect(
349 self.url
350 .trim_start_matches("http://")
351 .trim_end_matches("/sync"),
352 );
353 if let Some(join) = self.join.take() {
354 let _ = join.join();
355 }
356 }
357}
358
359impl AppTestHttpServer {
360 pub fn start(app_schema: syncular_runtime::app_schema::AppSchema) -> Result<Self> {
361 Self::start_with_server(AppTestServer::new(app_schema))
362 }
363
364 pub fn start_with_server(app_server: AppTestServer) -> Result<Self> {
365 let listener = TcpListener::bind("127.0.0.1:0")?;
366 listener.set_nonblocking(true)?;
367 let addr = listener.local_addr()?;
368 let broadcaster: WsBroadcaster = Arc::new(Mutex::new(Vec::new()));
369 let state = Arc::new(Mutex::new(TestHttpState::default()));
370 let stop = Arc::new(AtomicBool::new(false));
371 let thread_stop = stop.clone();
372 let thread_server = app_server.clone();
373 let thread_broadcaster = broadcaster.clone();
374 let thread_state = state.clone();
375 let join = thread::spawn(move || {
376 while !thread_stop.load(Ordering::Relaxed) {
377 match listener.accept() {
378 Ok((stream, _)) => {
379 if thread_stop.load(Ordering::Relaxed) {
380 break;
381 }
382 let _ = stream.set_nonblocking(false);
383 let connection_server = thread_server.clone();
384 let connection_broadcaster = thread_broadcaster.clone();
385 let connection_stop = thread_stop.clone();
386 let connection_state = thread_state.clone();
387 thread::spawn(move || {
388 handle_app_test_http_connection(
389 stream,
390 &connection_server,
391 &connection_broadcaster,
392 &connection_state,
393 &connection_stop,
394 );
395 });
396 }
397 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
398 thread::sleep(Duration::from_millis(5));
399 }
400 Err(_) => break,
401 }
402 }
403 });
404
405 Ok(Self {
406 addr,
407 app_server,
408 broadcaster,
409 state,
410 stop,
411 join: Some(join),
412 })
413 }
414
415 pub fn addr(&self) -> SocketAddr {
416 self.addr
417 }
418
419 pub fn url(&self) -> String {
420 format!("http://{}/sync", self.addr)
421 }
422
423 pub fn realtime_url(&self, client_id: &str) -> String {
424 format!(
425 "ws://{}/sync/realtime?clientId={}",
426 self.addr,
427 client_id.replace(' ', "%20")
428 )
429 }
430
431 pub fn app_server(&self) -> &AppTestServer {
432 &self.app_server
433 }
434
435 pub fn requests(&self) -> Vec<TestHttpRequest> {
436 self.state
437 .lock()
438 .expect("app test HTTP server state")
439 .requests
440 .clone()
441 }
442
443 pub fn wait_for_requests(&self, expected: usize, timeout: Duration) -> Vec<TestHttpRequest> {
444 let deadline = Instant::now() + timeout;
445 loop {
446 let requests = self.requests();
447 if requests.len() >= expected || Instant::now() >= deadline {
448 return requests;
449 }
450 thread::sleep(Duration::from_millis(5));
451 }
452 }
453
454 pub fn push_realtime_sync(&self) {
455 broadcast_realtime_sync(&self.broadcaster);
456 }
457}
458
459impl Drop for AppTestHttpServer {
460 fn drop(&mut self) {
461 let _ = self.broadcaster.lock().map(|peers| peers.len());
462 self.stop.store(true, Ordering::Relaxed);
463 let _ = TcpStream::connect(self.addr);
464 if let Some(join) = self.join.take() {
465 let _ = join.join();
466 }
467 }
468}
469
470pub fn empty_success_response() -> TestHttpResponse {
471 TestHttpResponse::json(json!({
472 "ok": true,
473 "push": null,
474 "pull": {
475 "ok": true,
476 "subscriptions": []
477 }
478 }))
479}
480
481pub fn encoded_blob_hash(hash: &str) -> String {
482 hash.replace(':', "%3A")
483}
484
485fn handle_app_test_http_connection(
486 mut stream: TcpStream,
487 server: &AppTestServer,
488 broadcaster: &WsBroadcaster,
489 state: &Arc<Mutex<TestHttpState>>,
490 stop: &Arc<AtomicBool>,
491) {
492 if is_websocket_request(&stream) {
493 handle_app_test_ws_connection(stream, server, broadcaster, state, stop);
494 return;
495 }
496
497 let request = read_http_request(&mut stream);
498 if request.body.is_empty() && request.method.is_empty() {
499 return;
500 }
501 record_app_test_http_request(state, request.clone());
502 if request.method != "POST" || request.path != "/sync" {
503 write_http_response(
504 &mut stream,
505 TestHttpResponse::status(404, "Not Found", "not found"),
506 );
507 return;
508 }
509 let auth_headers = auth_headers_from_request(&request);
510 server.record_auth_headers(auth_headers.clone());
511 if !server.is_authorized_headers(&auth_headers) {
512 write_http_response(
513 &mut stream,
514 TestHttpResponse::status(401, "Unauthorized", "unauthorized"),
515 );
516 return;
517 }
518
519 let response = (|| {
520 let request = serde_json::from_str::<CombinedRequest>(&request.body)?;
521 let before_commits = server.commits().len();
522 let response = server.post_sync(&request)?;
523 let broadcast = server.commits().len() > before_commits;
524 let body = serde_json::to_string(&response)?;
525 Ok::<_, syncular_runtime::error::SyncularError>((body, broadcast))
526 })();
527
528 match response {
529 Ok((body, broadcast)) => {
530 write_http_response(
531 &mut stream,
532 TestHttpResponse {
533 status: 200,
534 reason: "OK".to_string(),
535 content_type: "application/json".to_string(),
536 body,
537 },
538 );
539 if broadcast {
540 broadcast_realtime_sync(broadcaster);
541 }
542 }
543 Err(error) => {
544 write_http_response(
545 &mut stream,
546 TestHttpResponse {
547 status: 500,
548 reason: "Internal Server Error".to_string(),
549 content_type: "application/json".to_string(),
550 body: json!({ "error": error.to_string() }).to_string(),
551 },
552 );
553 }
554 }
555}
556
557fn is_websocket_request(stream: &TcpStream) -> bool {
558 let mut buffer = [0u8; 4];
559 match stream.peek(&mut buffer) {
560 Ok(read) => read >= 3 && buffer[..read].starts_with(b"GET"),
561 Err(_) => false,
562 }
563}
564
565fn handle_app_test_ws_connection(
566 stream: TcpStream,
567 server: &AppTestServer,
568 broadcaster: &WsBroadcaster,
569 state: &Arc<Mutex<TestHttpState>>,
570 stop: &Arc<AtomicBool>,
571) {
572 let mut client_id = String::new();
573 let mut socket = match tungstenite::accept_hdr(
574 stream,
575 |request: &tungstenite::handshake::server::Request, response| {
576 client_id = query_param(request.uri().query().unwrap_or_default(), "clientId")
577 .unwrap_or_else(|| "app-test-ws-client".to_string());
578 record_app_test_http_request(state, websocket_request_record(request));
579 let auth_headers = auth_headers_from_ws_request(request);
580 server.record_auth_headers(auth_headers.clone());
581 if !server.is_authorized_headers(&auth_headers) {
582 return Err(tungstenite::handshake::server::Response::builder()
583 .status(StatusCode::UNAUTHORIZED)
584 .body(Some("unauthorized".to_string()))
585 .expect("unauthorized websocket response"));
586 }
587 Ok(response)
588 },
589 ) {
590 Ok(socket) => socket,
591 Err(_) => return,
592 };
593 set_ws_stream_read_timeout(&mut socket, Duration::from_millis(50));
594 let (tx, rx) = mpsc::channel::<()>();
595 broadcaster
596 .lock()
597 .expect("app test ws broadcaster")
598 .push(tx);
599
600 while !stop.load(Ordering::Relaxed) {
601 while rx.try_recv().is_ok() {
602 if socket
603 .send(Message::Text(json!({ "event": "sync" }).to_string().into()))
604 .is_err()
605 {
606 let _ = socket.close(None);
607 return;
608 }
609 }
610 let message = match socket.read() {
611 Ok(Message::Text(text)) => text.to_string(),
612 Ok(Message::Ping(bytes)) => {
613 let _ = socket.send(Message::Pong(bytes));
614 continue;
615 }
616 Ok(Message::Close(_)) => break,
617 Ok(_) => continue,
618 Err(tungstenite::Error::Io(error))
619 if error.kind() == std::io::ErrorKind::WouldBlock
620 || error.kind() == std::io::ErrorKind::TimedOut =>
621 {
622 continue;
623 }
624 Err(_) => break,
625 };
626
627 let response = handle_ws_push_message(server, &client_id, &message);
628 match response {
629 Ok(response) => {
630 if socket
631 .send(Message::Text(response.to_string().into()))
632 .is_err()
633 {
634 break;
635 }
636 broadcast_realtime_sync(broadcaster);
637 }
638 Err(error) => {
639 let _ = socket.send(Message::Text(
640 json!({
641 "event": "error",
642 "data": { "message": error.to_string() }
643 })
644 .to_string()
645 .into(),
646 ));
647 break;
648 }
649 }
650 }
651
652 let _ = socket.close(None);
653}
654
655fn record_app_test_http_request(state: &Arc<Mutex<TestHttpState>>, request: TestHttpRequest) {
656 state
657 .lock()
658 .expect("app test HTTP server state")
659 .requests
660 .push(request);
661}
662
663fn websocket_request_record(request: &tungstenite::handshake::server::Request) -> TestHttpRequest {
664 TestHttpRequest {
665 method: request.method().to_string(),
666 path: request
667 .uri()
668 .path_and_query()
669 .map(|path| path.as_str().to_string())
670 .unwrap_or_else(|| request.uri().path().to_string()),
671 headers: request
672 .headers()
673 .iter()
674 .filter_map(|(name, value)| {
675 Some((
676 name.as_str().to_ascii_lowercase(),
677 value.to_str().ok()?.to_string(),
678 ))
679 })
680 .collect(),
681 body: String::new(),
682 }
683}
684
685fn auth_headers_from_request(request: &TestHttpRequest) -> SyncAuthHeaders {
686 request
687 .headers
688 .iter()
689 .filter_map(|(name, value)| auth_header_entry(name, value))
690 .collect()
691}
692
693fn auth_headers_from_ws_request(
694 request: &tungstenite::handshake::server::Request,
695) -> SyncAuthHeaders {
696 request
697 .headers()
698 .iter()
699 .filter_map(|(name, value)| auth_header_entry(name.as_str(), value.to_str().ok()?))
700 .collect()
701}
702
703fn auth_header_entry(name: &str, value: &str) -> Option<(String, String)> {
704 let name = name.to_ascii_lowercase();
705 if name == "authorization"
706 || (name.starts_with("x-syncular-") && name != "x-syncular-schema-version")
707 {
708 Some((name, value.to_string()))
709 } else {
710 None
711 }
712}
713
714fn handle_ws_push_message(server: &AppTestServer, client_id: &str, message: &str) -> Result<Value> {
715 let message = serde_json::from_str::<TestWsPushMessage>(message)?;
716 if message.message_type != "push" {
717 return Err(syncular_runtime::error::SyncularError::protocol_message(
718 format!(
719 "unsupported websocket message type: {}",
720 message.message_type
721 ),
722 ));
723 }
724 let request_id = message.request_id;
725 let request = CombinedRequest {
726 client_id: client_id.to_string(),
727 push: Some(PushBatchRequest {
728 commits: vec![PushCommitRequest {
729 client_commit_id: message.client_commit_id,
730 operations: message.operations,
731 schema_version: message.schema_version,
732 auth_lease: message.auth_lease,
733 }],
734 }),
735 pull: None,
736 };
737 let combined = server.post_sync(&request)?;
738 let response = combined
739 .push
740 .and_then(|push| push.commits.into_iter().next())
741 .ok_or_else(|| {
742 syncular_runtime::error::SyncularError::protocol_message(
743 "missing websocket push response",
744 )
745 })?;
746 Ok(json!({
747 "event": "push-response",
748 "data": {
749 "requestId": request_id,
750 "clientCommitId": response.client_commit_id,
751 "status": response.status,
752 "commitSeq": response.commit_seq,
753 "results": response.results,
754 }
755 }))
756}
757
758fn broadcast_realtime_sync(broadcaster: &WsBroadcaster) {
759 let mut peers = broadcaster.lock().expect("app test ws broadcaster");
760 peers.retain(|sender| sender.send(()).is_ok());
761}
762
763fn set_ws_stream_read_timeout(socket: &mut tungstenite::WebSocket<TcpStream>, timeout: Duration) {
764 let _ = socket.get_mut().set_read_timeout(Some(timeout));
765}
766
767fn query_param(query: &str, name: &str) -> Option<String> {
768 query.split('&').find_map(|pair| {
769 let (key, value) = pair.split_once('=')?;
770 if key == name {
771 Some(value.replace("%20", " "))
772 } else {
773 None
774 }
775 })
776}
777
778fn handle_blob_connection(
779 stream: TcpStream,
780 addr: SocketAddr,
781 options: &TestBlobServerOptions,
782 state: &Arc<Mutex<TestBlobState>>,
783) {
784 let Ok((request, mut stream)) = read_blob_http_request(stream) else {
785 return;
786 };
787 let response = blob_response_for(&request, addr, options);
788 let _ = stream.write_all(response.as_slice());
789 state
790 .lock()
791 .expect("test blob server state")
792 .requests
793 .push(request);
794}
795
796fn read_blob_http_request(stream: TcpStream) -> std::io::Result<(TestBlobHttpRequest, TcpStream)> {
797 let mut reader = BufReader::new(stream);
798 let mut request_line = String::new();
799 reader.read_line(&mut request_line)?;
800 let mut request_parts = request_line.split_whitespace();
801 let method = request_parts.next().unwrap_or_default().to_string();
802 let path = request_parts.next().unwrap_or("/").to_string();
803 let mut headers = BTreeMap::new();
804 loop {
805 let mut line = String::new();
806 reader.read_line(&mut line)?;
807 if line == "\r\n" || line.is_empty() {
808 break;
809 }
810 if let Some((name, value)) = line.split_once(':') {
811 headers.insert(name.trim().to_ascii_lowercase(), value.trim().to_string());
812 }
813 }
814 let content_length = headers
815 .get("content-length")
816 .and_then(|value| value.parse::<usize>().ok())
817 .unwrap_or(0);
818 let body = if headers
819 .get("transfer-encoding")
820 .is_some_and(|value| value.to_ascii_lowercase().contains("chunked"))
821 {
822 read_chunked_body(&mut reader)?
823 } else {
824 let mut body = vec![0u8; content_length];
825 reader.read_exact(&mut body)?;
826 body
827 };
828 let stream = reader.into_inner();
829 Ok((
830 TestBlobHttpRequest {
831 method,
832 path,
833 headers,
834 body,
835 },
836 stream,
837 ))
838}
839
840fn read_chunked_body(reader: &mut BufReader<TcpStream>) -> std::io::Result<Vec<u8>> {
841 let mut body = Vec::new();
842 loop {
843 let mut size_line = String::new();
844 reader.read_line(&mut size_line)?;
845 let size_text = size_line
846 .trim()
847 .split_once(';')
848 .map(|(size, _)| size)
849 .unwrap_or_else(|| size_line.trim());
850 let size = usize::from_str_radix(size_text, 16).unwrap_or(0);
851 if size == 0 {
852 loop {
853 let mut trailer = String::new();
854 reader.read_line(&mut trailer)?;
855 if trailer == "\r\n" || trailer.is_empty() {
856 return Ok(body);
857 }
858 }
859 }
860 let mut chunk = vec![0u8; size];
861 reader.read_exact(&mut chunk)?;
862 body.extend(chunk);
863 let mut crlf = [0u8; 2];
864 reader.read_exact(&mut crlf)?;
865 }
866}
867
868fn blob_response_for(
869 request: &TestBlobHttpRequest,
870 addr: SocketAddr,
871 options: &TestBlobServerOptions,
872) -> Vec<u8> {
873 let encoded_hash = encoded_blob_hash(&options.hash);
874 let complete_path = format!("/sync/blobs/{encoded_hash}/complete");
875 let signed_url_path = format!("/sync/blobs/{encoded_hash}/url");
876 let body = match request.path.as_str() {
877 "/sync/blobs/upload" => format!(
878 r#"{{"exists":false,"uploadUrl":"http://{addr}{}","uploadMethod":"PUT","uploadHeaders":{{"x-upload-token":"{}"}}}}"#,
879 options.upload_path, options.upload_token
880 )
881 .into_bytes(),
882 path if path == options.upload_path.as_str() => b"OK".to_vec(),
883 path if path == complete_path.as_str() => br#"{"ok":true}"#.to_vec(),
884 path if path == signed_url_path.as_str() => format!(
885 r#"{{"url":"http://{addr}{}","expiresAt":"2099-01-01T00:00:00.000Z"}}"#,
886 options.download_path
887 )
888 .into_bytes(),
889 path if path == options.download_path.as_str() => options.bytes.clone(),
890 _ => br#"{"error":"NOT_FOUND"}"#.to_vec(),
891 };
892 let status = if request.path == options.download_path
893 || request.path == options.upload_path
894 || request.path.starts_with("/sync/blobs/")
895 {
896 "200 OK"
897 } else {
898 "404 Not Found"
899 };
900 let content_type = if request.path == options.download_path {
901 "application/octet-stream"
902 } else {
903 "application/json"
904 };
905 let head = format!(
906 "HTTP/1.1 {status}\r\ncontent-type: {content_type}\r\ncontent-length: {}\r\nconnection: close\r\n\r\n",
907 body.len()
908 );
909 let mut response = head.into_bytes();
910 response.extend(body);
911 response
912}
913
914fn handle_connection(stream: &mut TcpStream, state: &Arc<Mutex<TestHttpState>>) {
915 let request = read_http_request(stream);
916 if request.body.is_empty() && request.method.is_empty() {
917 return;
918 }
919 let response = {
920 let mut state = state.lock().expect("test http server state");
921 state.requests.push(request);
922 state
923 .responses
924 .pop_front()
925 .unwrap_or_else(empty_success_response)
926 };
927 write_http_response(stream, response);
928}
929
930fn read_http_request(stream: &mut TcpStream) -> TestHttpRequest {
931 let _ = stream.set_read_timeout(Some(Duration::from_secs(2)));
932 let mut buffer = Vec::new();
933 let mut chunk = [0u8; 8192];
934 loop {
935 let Ok(read) = stream.read(&mut chunk) else {
936 break;
937 };
938 if read == 0 {
939 break;
940 }
941 buffer.extend_from_slice(&chunk[..read]);
942 if request_body_complete(&buffer) {
943 break;
944 }
945 }
946 parse_http_request(&String::from_utf8_lossy(&buffer))
947}
948
949fn request_body_complete(buffer: &[u8]) -> bool {
950 let text = String::from_utf8_lossy(buffer);
951 let Some((headers, body)) = text.split_once("\r\n\r\n") else {
952 return false;
953 };
954 let content_length = headers
955 .lines()
956 .find_map(|line| {
957 let (name, value) = line.split_once(':')?;
958 if name.eq_ignore_ascii_case("content-length") {
959 value.trim().parse::<usize>().ok()
960 } else {
961 None
962 }
963 })
964 .unwrap_or(0);
965 body.as_bytes().len() >= content_length
966}
967
968fn parse_http_request(raw: &str) -> TestHttpRequest {
969 let (head, body) = raw.split_once("\r\n\r\n").unwrap_or((raw, ""));
970 let mut lines = head.lines();
971 let request_line = lines.next().unwrap_or_default();
972 let mut request_parts = request_line.split_whitespace();
973 let method = request_parts.next().unwrap_or_default().to_string();
974 let path = request_parts.next().unwrap_or_default().to_string();
975 let headers = lines
976 .filter_map(|line| {
977 let (name, value) = line.split_once(':')?;
978 Some((name.trim().to_ascii_lowercase(), value.trim().to_string()))
979 })
980 .collect();
981 TestHttpRequest {
982 method,
983 path,
984 headers,
985 body: body.to_string(),
986 }
987}
988
989fn write_http_response(stream: &mut TcpStream, response: TestHttpResponse) {
990 let body = response.body;
991 let message = format!(
992 "HTTP/1.1 {} {}\r\ncontent-type: {}\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}",
993 response.status,
994 response.reason,
995 response.content_type,
996 body.len(),
997 body
998 );
999 let _ = stream.write_all(message.as_bytes());
1000}