1use std::{
7 io::{self},
8 sync::{Arc, Mutex},
9};
10
11use crate::msgs::{
12 AnyRequestId, ObjectId,
13 request::InvalidRequestError,
14 response::{ResponseKind, RpcError, ValidatedResponse},
15};
16
17mod auth;
18mod builder;
19mod connimpl;
20mod stream;
21
22use crate::util::Utf8CString;
23pub use builder::{BuilderError, ConnPtDescription, RpcConnBuilder};
24pub use connimpl::RpcConn;
25use serde::{Deserialize, de::DeserializeOwned};
26pub use stream::StreamError;
27use tor_rpc_connect::{HasClientErrorAction, auth::cookie::CookieAccessError};
28
29#[derive(educe::Educe)]
37#[educe(Debug)]
38pub struct RequestHandle {
39 #[educe(Debug(ignore))]
49 conn: Mutex<Arc<connimpl::Receiver>>,
50 id: AnyRequestId,
52}
53
54#[derive(Clone, Debug, derive_more::AsRef, derive_more::Into)]
74#[as_ref(forward)]
75pub struct SuccessResponse(Utf8CString);
76
77impl SuccessResponse {
78 fn decode<D: DeserializeOwned>(&self) -> Result<D, serde_json::Error> {
80 #[derive(Deserialize)]
82 struct Response<R> {
83 result: R,
85 }
86 let response: Response<D> = serde_json::from_str(self.as_ref())?;
87 Ok(response.result)
88 }
89}
90
91#[derive(Clone, Debug, derive_more::AsRef, derive_more::Into)]
99#[as_ref(forward)]
100pub struct UpdateResponse(Utf8CString);
101
102#[derive(Clone, Debug, derive_more::AsRef, derive_more::Into)]
116#[as_ref(forward)]
117pub struct ErrorResponse(Utf8CString);
119impl ErrorResponse {
120 pub(crate) fn from_validated_string(s: Utf8CString) -> Self {
124 ErrorResponse(s)
125 }
126
127 pub(crate) fn internal_error(&self, cmd: &str) -> ProtoError {
131 ProtoError::InternalRequestFailed(UnexpectedReply {
132 request: cmd.to_string(),
133 reply: self.to_string(),
134 problem: UnexpectedReplyProblem::ErrorNotExpected,
135 })
136 }
137
138 pub fn decode(&self) -> RpcError {
140 crate::msgs::response::try_decode_response_as_err(self.0.as_ref())
141 .expect("Could not decode response that was already decoded as an error?")
142 .expect("Could not extract error from response that was already decoded as an error?")
143 }
144}
145
146impl std::fmt::Display for ErrorResponse {
147 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148 let e = self.decode();
149 write!(f, "Peer said {:?}", e.message())
150 }
151}
152
153type FinalResponse = Result<SuccessResponse, ErrorResponse>;
156
157#[derive(Clone, Debug)]
159#[allow(clippy::exhaustive_structs)]
160pub enum AnyResponse {
161 Success(SuccessResponse),
163 Error(ErrorResponse),
165 Update(UpdateResponse),
167}
168impl AnyResponse {
171 fn from_validated(v: ValidatedResponse) -> Self {
173 match v.meta.kind {
176 ResponseKind::Error => AnyResponse::Error(ErrorResponse::from_validated_string(v.msg)),
177 ResponseKind::Success => AnyResponse::Success(SuccessResponse(v.msg)),
178 ResponseKind::Update => AnyResponse::Update(UpdateResponse(v.msg)),
179 }
180 }
181
182 #[cfg(feature = "ffi")]
184 pub(crate) fn into_string(self) -> Utf8CString {
185 match self {
186 AnyResponse::Success(m) => m.into(),
187 AnyResponse::Error(m) => m.into(),
188 AnyResponse::Update(m) => m.into(),
189 }
190 }
191}
192
193impl RpcConn {
194 pub fn session(&self) -> Option<&ObjectId> {
202 self.session.as_ref()
203 }
204
205 pub fn execute(&self, cmd: &str) -> Result<FinalResponse, ProtoError> {
214 let hnd = self.execute_with_handle(cmd)?;
215 hnd.wait()
216 }
217
218 pub(crate) fn execute_internal<T: DeserializeOwned>(
229 &self,
230 cmd: &str,
231 ) -> Result<Result<T, ErrorResponse>, ProtoError> {
232 match self.execute(cmd)? {
233 Ok(success) => match success.decode::<T>() {
234 Ok(result) => Ok(Ok(result)),
235 Err(json_error) => Err(ProtoError::InternalRequestFailed(UnexpectedReply {
236 request: cmd.to_string(),
237 reply: Utf8CString::from(success).to_string(),
238 problem: UnexpectedReplyProblem::CannotDecode(Arc::new(json_error)),
239 })),
240 },
241 Err(error) => Ok(Err(error)),
242 }
243 }
244
245 pub(crate) fn execute_internal_ok<T: DeserializeOwned>(
253 &self,
254 cmd: &str,
255 ) -> Result<T, ProtoError> {
256 match self.execute_internal(cmd)? {
257 Ok(v) => Ok(v),
258 Err(err_response) => Err(err_response.internal_error(cmd)),
259 }
260 }
261
262 pub fn cancel(&self, request_id: &AnyRequestId) -> Result<(), ProtoError> {
264 #[derive(serde::Serialize, Debug)]
266 struct CancelParams<'a> {
267 request_id: &'a AnyRequestId,
269 }
270
271 let request = crate::msgs::request::Request::new(
272 ObjectId::connection_id(),
273 "rpc:cancel",
274 CancelParams { request_id },
275 );
276 match self.execute_internal::<EmptyReply>(&request.encode()?)? {
277 Ok(EmptyReply {}) => Ok(()),
278 Err(_) => Err(ProtoError::RequestCompleted),
279 }
280 }
281
282 pub fn execute_with_handle(&self, cmd: &str) -> Result<RequestHandle, ProtoError> {
285 self.send_request(cmd)
286 }
287 pub fn execute_with_updates<F>(
289 &self,
290 cmd: &str,
291 mut update_cb: F,
292 ) -> Result<FinalResponse, ProtoError>
293 where
294 F: FnMut(UpdateResponse) + Send + Sync,
295 {
296 let hnd = self.execute_with_handle(cmd)?;
297 loop {
298 match hnd.wait_with_updates()? {
299 AnyResponse::Success(s) => return Ok(Ok(s)),
300 AnyResponse::Error(e) => return Ok(Err(e)),
301 AnyResponse::Update(u) => update_cb(u),
302 }
303 }
304 }
305
306 pub(crate) fn release_obj(&self, obj: ObjectId) -> Result<(), ProtoError> {
311 let release_request = crate::msgs::request::Request::new(obj, "rpc:release", NoParams {});
312 let _empty_response: EmptyReply = self.execute_internal_ok(&release_request.encode()?)?;
313 Ok(())
314 }
315
316 }
318
319impl RequestHandle {
320 pub fn id(&self) -> &AnyRequestId {
322 &self.id
323 }
324 pub fn wait(self) -> Result<FinalResponse, ProtoError> {
332 loop {
333 match self.wait_with_updates()? {
334 AnyResponse::Success(s) => return Ok(Ok(s)),
335 AnyResponse::Error(e) => return Ok(Err(e)),
336 AnyResponse::Update(_) => {}
337 }
338 }
339 }
340 pub fn wait_with_updates(&self) -> Result<AnyResponse, ProtoError> {
353 let conn = self.conn.lock().expect("Poisoned lock");
354 let validated = conn.wait_on_message_for(&self.id)?;
355
356 Ok(AnyResponse::from_validated(validated))
357 }
358
359 }
362
363#[derive(Clone, Debug, thiserror::Error)]
365#[non_exhaustive]
366pub enum ShutdownError {
367 #[error("Unable to read response")]
371 Read(#[source] Arc<io::Error>),
372 #[error("Unable to write request")]
374 Write(#[source] Arc<io::Error>),
375 #[error("Arti sent a message that didn't conform to the RPC protocol: {0:?}")]
377 ProtocolViolated(String),
378 #[error("Arti reported a fatal error: {0:?}")]
380 ProtocolViolationReport(ErrorResponse),
381 #[error("Connection closed")]
385 ConnectionClosed,
386}
387
388impl From<crate::msgs::response::DecodeResponseError> for ShutdownError {
389 fn from(value: crate::msgs::response::DecodeResponseError) -> Self {
390 use crate::msgs::response::DecodeResponseError::*;
391 use ShutdownError as E;
392 match value {
393 JsonProtocolViolation(e) => E::ProtocolViolated(e.to_string()),
394 ProtocolViolation(s) => E::ProtocolViolated(s.to_string()),
395 Fatal(rpc_err) => E::ProtocolViolationReport(rpc_err),
396 }
397 }
398}
399
400#[derive(Clone, Debug, thiserror::Error)]
402#[non_exhaustive]
403pub enum ProtoError {
404 #[error("RPC connection is shut down")]
406 Shutdown(#[from] ShutdownError),
407
408 #[error("Invalid request")]
410 InvalidRequest(#[from] InvalidRequestError),
411
412 #[error("Request ID already in use.")]
414 RequestIdInUse,
415
416 #[error("Request has already completed (or failed)")]
418 RequestCompleted,
419
420 #[error("Internal error: waiting on the same request more than once at a time.")]
424 DuplicateWait,
425
426 #[error("Internal error while encoding request")]
430 CouldNotEncode(#[source] Arc<serde_json::Error>),
431
432 #[error("{0}")]
434 InternalRequestFailed(#[source] UnexpectedReply),
435}
436
437#[derive(Clone, Debug, thiserror::Error)]
439pub struct ConnectFailure {
440 declined: Vec<(builder::ConnPtDescription, ConnectError)>,
442 final_desc: Option<builder::ConnPtDescription>,
444 #[source]
449 pub(crate) final_error: ConnectError,
450}
451
452impl std::fmt::Display for ConnectFailure {
453 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
454 write!(f, "Unable to connect")?;
455 if !self.declined.is_empty() {
456 write!(
457 f,
458 " ({} attempts failed{})",
459 self.declined.len(),
460 if matches!(self.final_error, ConnectError::AllAttemptsDeclined) {
461 ""
462 } else {
463 " before fatal error"
464 }
465 )?;
466 }
467 Ok(())
468 }
469}
470
471impl ConnectFailure {
472 pub fn fatal_error_origin(&self) -> Option<&builder::ConnPtDescription> {
475 self.final_desc.as_ref()
476 }
477
478 pub fn declined_attempt_outcomes(
481 &self,
482 ) -> impl Iterator<Item = (&builder::ConnPtDescription, &ConnectError)> {
483 self.declined.iter().map(|(a, b)| (a, b))
485 }
486
487 pub fn display_verbose(&self) -> ConnectFailureVerboseFmt<'_> {
492 ConnectFailureVerboseFmt(self)
493 }
494}
495
496#[derive(Debug, Clone)]
499pub struct ConnectFailureVerboseFmt<'a>(&'a ConnectFailure);
500
501impl<'a> std::fmt::Display for ConnectFailureVerboseFmt<'a> {
502 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
503 use tor_error::ErrorReport as _;
504 writeln!(f, "{}:", self.0)?;
505 for (idx, (origin, error)) in self.0.declined_attempt_outcomes().enumerate() {
506 writeln!(f, " {}. {}: {}", idx + 1, origin, error.report())?;
507 }
508 if let Some(origin) = self.0.fatal_error_origin() {
509 writeln!(
510 f,
511 " {}. [FATAL] {}: {}",
512 self.0.declined.len() + 1,
513 origin,
514 self.0.final_error.report()
515 )?;
516 } else {
517 writeln!(f, " - {}", self.0.final_error.report())?;
518 }
519 Ok(())
520 }
521}
522
523#[derive(Clone, Debug, thiserror::Error)]
525#[non_exhaustive]
526pub enum ConnectError {
527 #[error("Cannot parse connect points from environment variable")]
529 BadEnvironment,
530 #[error("Unable to load and parse connect point")]
532 CannotParse(#[from] tor_rpc_connect::load::LoadError),
533 #[error("Unable to resolve connect point path")]
535 CannotResolvePath(#[source] tor_config_path::CfgPathError),
536 #[error("Unable to resolve connect point")]
538 CannotResolveConnectPoint(#[from] tor_rpc_connect::ResolveError),
539 #[error("Unable to make a connection")]
541 CannotConnect(#[from] tor_rpc_connect::ConnectError),
542 #[error("Connect point stream type was unsupported")]
544 StreamTypeUnsupported,
545 #[error("Did not receive expected banner message upon connecting")]
550 InvalidBanner,
551 #[error("All connect points were declined (or there were none)")]
553 AllAttemptsDeclined,
554 #[error("Connect file was given as a relative path.")]
557 RelativeConnectFile,
558 #[error("Received an error while trying to authenticate: {0}")]
560 AuthenticationFailed(ErrorResponse),
561 #[error("Authentication type is not supported")]
563 AuthenticationNotSupported,
564 #[error("Message not in expected format")]
566 BadMessage(#[source] Arc<serde_json::Error>),
567 #[error("Error while negotiating with Arti")]
569 ProtoError(#[from] ProtoError),
570 #[error("We connected to the server at {ours}, but it thinks it's listening at {theirs}")]
573 ServerAddressMismatch {
574 ours: String,
576 theirs: String,
578 },
579 #[error("Server's cookie MAC was not as expected.")]
581 CookieMismatch,
582 #[error("Unable to load secret cookie value")]
584 LoadCookie(#[from] CookieAccessError),
585}
586
587impl HasClientErrorAction for ConnectError {
588 fn client_action(&self) -> tor_rpc_connect::ClientErrorAction {
589 use ConnectError as E;
590 use tor_rpc_connect::ClientErrorAction as A;
591 match self {
592 E::BadEnvironment => A::Abort,
593 E::CannotParse(e) => e.client_action(),
594 E::CannotResolvePath(_) => A::Abort,
595 E::CannotResolveConnectPoint(e) => e.client_action(),
596 E::CannotConnect(e) => e.client_action(),
597 E::StreamTypeUnsupported => A::Decline,
598 E::InvalidBanner => A::Decline,
599 E::RelativeConnectFile => A::Abort,
600 E::AuthenticationFailed(_) => A::Decline,
601 E::BadMessage(_) => A::Abort,
604 E::ProtoError(e) => e.client_action(),
605 E::AllAttemptsDeclined => A::Abort,
606 E::AuthenticationNotSupported => A::Decline,
607 E::ServerAddressMismatch { .. } => A::Abort,
608 E::CookieMismatch => A::Abort,
609 E::LoadCookie(e) => e.client_action(),
610 }
611 }
612}
613
614impl HasClientErrorAction for ProtoError {
615 fn client_action(&self) -> tor_rpc_connect::ClientErrorAction {
616 use ProtoError as E;
617 use tor_rpc_connect::ClientErrorAction as A;
618 match self {
619 E::Shutdown(_) => A::Decline,
620 E::InternalRequestFailed(_) => A::Decline,
621 E::InvalidRequest(_)
624 | E::RequestIdInUse
625 | E::RequestCompleted
626 | E::DuplicateWait
627 | E::CouldNotEncode(_) => A::Abort,
628 }
629 }
630}
631
632#[derive(Clone, Debug, thiserror::Error)]
638#[error("In response to our request {request:?}, Arti gave the unexpected reply {reply:?}")]
639pub struct UnexpectedReply {
640 request: String,
642 reply: String,
644 #[source]
646 problem: UnexpectedReplyProblem,
647}
648
649#[derive(Clone, Debug, thiserror::Error)]
651enum UnexpectedReplyProblem {
652 #[error("Cannot decode as correct JSON type")]
655 CannotDecode(Arc<serde_json::Error>),
656 #[error("Unexpected error")]
658 ErrorNotExpected,
659}
660
661#[derive(serde::Serialize, Debug)]
663struct NoParams {}
664
665#[derive(serde::Deserialize, Debug)]
667struct EmptyReply {}
668
669#[cfg(test)]
670mod test {
671 #![allow(clippy::bool_assert_comparison)]
673 #![allow(clippy::clone_on_copy)]
674 #![allow(clippy::dbg_macro)]
675 #![allow(clippy::mixed_attributes_style)]
676 #![allow(clippy::print_stderr)]
677 #![allow(clippy::print_stdout)]
678 #![allow(clippy::single_char_pattern)]
679 #![allow(clippy::unwrap_used)]
680 #![allow(clippy::unchecked_time_subtraction)]
681 #![allow(clippy::useless_vec)]
682 #![allow(clippy::needless_pass_by_value)]
683 use std::{sync::atomic::AtomicUsize, thread, time::Duration};
686
687 use io::{BufRead as _, BufReader, Write as _};
688 use rand::{Rng as _, SeedableRng as _, seq::SliceRandom as _};
689 use tor_basic_utils::{RngExt as _, test_rng::testing_rng};
690
691 use crate::{
692 msgs::request::{JsonMap, Request, ValidatedRequest},
693 nb_stream::PollingStream,
694 };
695
696 use super::*;
697
698 fn dummy_connected() -> (RpcConn, crate::testing::SocketpairStream) {
700 let (s1, s2) = crate::testing::construct_socketpair().unwrap();
701 let conn = RpcConn::new(PollingStream::new(s1).unwrap());
702
703 (conn, s2)
704 }
705
706 fn write_val(w: &mut impl io::Write, v: &serde_json::Value) {
707 let mut enc = serde_json::to_string(v).unwrap();
708 enc.push('\n');
709 w.write_all(enc.as_bytes()).unwrap();
710 }
711
712 #[test]
713 fn simple() {
714 let (conn, sock) = dummy_connected();
715
716 let user_thread = thread::spawn(move || {
717 let response1 = conn
718 .execute_internal_ok::<JsonMap>(
719 r#"{"obj":"fred","method":"arti:x-frob","params":{}}"#,
720 )
721 .unwrap();
722 (response1, conn)
723 });
724
725 let fake_arti_thread = thread::spawn(move || {
726 let mut sock = BufReader::new(sock);
727 let mut s = String::new();
728 let _len = sock.read_line(&mut s).unwrap();
729 let request = ValidatedRequest::from_string_strict(s.as_ref()).unwrap();
730 let response = serde_json::json!({
731 "id": request.id().clone(),
732 "result": { "xyz" : 3 }
733 });
734 write_val(sock.get_mut(), &response);
735 sock });
737
738 let _sock = fake_arti_thread.join().unwrap();
739 let (map, _conn) = user_thread.join().unwrap();
740 assert_eq!(map.get("xyz"), Some(&serde_json::Value::Number(3.into())));
741 }
742
743 #[test]
744 fn complex() {
745 use std::sync::atomic::Ordering::SeqCst;
746 let n_threads = 16;
747 let n_commands_per_thread = 128;
748 let n_commands_total = n_threads * n_commands_per_thread;
749 let n_completed = Arc::new(AtomicUsize::new(0));
750
751 let (conn, sock) = dummy_connected();
752 let conn = Arc::new(conn);
753 let mut user_threads = Vec::new();
754 let mut rng = testing_rng();
755
756 for th_idx in 0..n_threads {
759 let conn = Arc::clone(&conn);
760 let n_completed = Arc::clone(&n_completed);
761 let mut rng = rand_chacha::ChaCha12Rng::from_seed(rng.random());
762 let th = thread::spawn(move || {
763 for cmd_idx in 0..n_commands_per_thread {
764 let s = format!("{}:{}", th_idx, cmd_idx);
769 let want_updates: bool = rng.random();
770 let want_failure: bool = rng.random();
771 let req = serde_json::json!({
772 "obj":"fred",
773 "method":"arti:x-echo",
774 "meta": {
775 "updates": want_updates,
776 },
777 "params": {
778 "val": &s,
779 "fail": want_failure,
780 },
781 });
782 let req = serde_json::to_string(&req).unwrap();
783
784 let mut n_updates = 0;
786 let outcome = conn
787 .execute_with_updates(&req, |_update| {
788 n_updates += 1;
789 })
790 .unwrap();
791 assert_eq!(n_updates > 0, want_updates);
792
793 if want_failure {
795 let e = outcome.unwrap_err().decode();
796 assert_eq!(e.message(), "You asked me to fail");
797 assert_eq!(i32::from(e.code()), 33);
798 assert_eq!(
799 e.kinds_iter().collect::<Vec<_>>(),
800 vec!["Example".to_string()]
801 );
802 } else {
803 let success = outcome.unwrap();
804 let map = success.decode::<JsonMap>().unwrap();
805 assert_eq!(map.get("echo"), Some(&serde_json::Value::String(s)));
806 }
807 n_completed.fetch_add(1, SeqCst);
808 if rng.random::<f32>() < 0.02 {
809 thread::sleep(Duration::from_millis(3));
810 }
811 }
812 });
813 user_threads.push(th);
814 }
815
816 #[derive(serde::Deserialize, Debug)]
817 struct Echo {
818 val: String,
819 fail: bool,
820 }
821
822 let worker_rng = rand_chacha::ChaCha12Rng::from_seed(rng.random());
825 let worker_thread = thread::spawn(move || {
826 let mut rng = worker_rng;
827 let mut sock = BufReader::new(sock);
828 let mut pending: Vec<Request<Echo>> = Vec::new();
829 let mut n_received = 0;
830
831 let scramble_factor = 7;
833 let scramble_threshold =
837 n_commands_total - (n_commands_per_thread + 1) * scramble_factor;
838
839 'outer: loop {
840 let flush_pending_at = if n_received >= scramble_threshold {
841 1
842 } else {
843 scramble_factor
844 };
845
846 while pending.len() < flush_pending_at {
848 let mut buf = String::new();
849 if sock.read_line(&mut buf).unwrap() == 0 {
850 break 'outer;
851 }
852 n_received += 1;
853 let req: Request<Echo> = serde_json::from_str(&buf).unwrap();
854 pending.push(req);
855 }
856
857 let mut handling = std::mem::take(&mut pending);
859 handling.shuffle(&mut rng);
860
861 for req in handling {
862 if req.meta.unwrap_or_default().updates {
863 let n_updates = rng.gen_range_checked(1..4).unwrap();
864 for _ in 0..n_updates {
865 let up = serde_json::json!({
866 "id": req.id.clone(),
867 "update": {
868 "hello": req.params.val.clone(),
869 }
870 });
871 write_val(sock.get_mut(), &up);
872 }
873 }
874
875 let response = if req.params.fail {
876 serde_json::json!({
877 "id": req.id.clone(),
878 "error": { "message": "You asked me to fail", "code": 33, "kinds": ["Example"], "data": req.params.val },
879 })
880 } else {
881 serde_json::json!({
882 "id": req.id.clone(),
883 "result": {
884 "echo": req.params.val
885 }
886 })
887 };
888 write_val(sock.get_mut(), &response);
889 }
890 }
891 });
892 drop(conn);
893 for t in user_threads {
894 t.join().unwrap();
895 }
896
897 worker_thread.join().unwrap();
898
899 assert_eq!(n_completed.load(SeqCst), n_commands_total);
900 }
901
902 #[test]
903 fn arti_socket_closed() {
904 let n_threads = 16;
908
909 let (conn, sock) = dummy_connected();
910 let conn = Arc::new(conn);
911 let mut user_threads = Vec::new();
912 for _ in 0..n_threads {
913 let conn = Arc::clone(&conn);
914 let th = thread::spawn(move || {
915 let req = serde_json::json!({
918 "obj":"fred",
919 "method":"arti:x-echo",
920 "params":{}
921 });
922 let req = serde_json::to_string(&req).unwrap();
923 let outcome = conn.execute(&req);
924 if !matches!(
925 &outcome,
926 Err(ProtoError::Shutdown(ShutdownError::Write(_)))
927 | Err(ProtoError::Shutdown(ShutdownError::Read(_))),
928 ) {
929 dbg!(&outcome);
930 }
931
932 assert!(matches!(
933 outcome,
934 Err(ProtoError::Shutdown(ShutdownError::Write(_)))
935 | Err(ProtoError::Shutdown(ShutdownError::Read(_)))
936 | Err(ProtoError::Shutdown(ShutdownError::ConnectionClosed))
937 ));
938 });
939 user_threads.push(th);
940 }
941
942 drop(sock);
943
944 for t in user_threads {
945 t.join().unwrap();
946 }
947 }
948
949 fn proto_err_with_msg<F>(msg: &str, outcome_ok: F)
953 where
954 F: Fn(ProtoError) -> bool,
955 {
956 let n_threads = 16;
957
958 let (conn, mut sock) = dummy_connected();
959 let conn = Arc::new(conn);
960 let mut user_threads = Vec::new();
961 for _ in 0..n_threads {
962 let conn = Arc::clone(&conn);
963 let th = thread::spawn(move || {
964 let req = serde_json::json!({
967 "obj":"fred",
968 "method":"arti:x-echo",
969 "params":{}
970 });
971 let req = serde_json::to_string(&req).unwrap();
972 conn.execute(&req)
973 });
974 user_threads.push(th);
975 }
976
977 sock.write_all(msg.as_bytes()).unwrap();
978
979 for t in user_threads {
980 let outcome = t.join().unwrap();
981 assert!(outcome_ok(outcome.unwrap_err()));
982 }
983 }
984
985 #[test]
986 fn syntax_error() {
987 proto_err_with_msg("this is not json\n", |outcome| {
988 matches!(
989 outcome,
990 ProtoError::Shutdown(ShutdownError::ProtocolViolated(_))
991 )
992 });
993 }
994
995 #[test]
996 fn fatal_error() {
997 let j = serde_json::json!({
998 "error":{ "message": "This test is doomed", "code": 413, "kinds": ["Example"], "data": {} },
999 });
1000 let mut s = serde_json::to_string(&j).unwrap();
1001 s.push('\n');
1002
1003 proto_err_with_msg(&s, |outcome| {
1004 matches!(
1005 outcome,
1006 ProtoError::Shutdown(ShutdownError::ProtocolViolationReport(_))
1007 )
1008 });
1009 }
1010}