1use std::{fmt::Debug, io::Write, str::from_utf8};
2
3use async_trait::async_trait;
4use quick_xml::{
5 events::{BytesStart, Event},
6 name::{Namespace, ResolveResult},
7 NsReader, Writer,
8};
9
10use crate::{
11 transport::{RecvHandle, SendHandle},
12 Error,
13};
14
15mod error;
16pub use self::error::{Read as ReadError, Write as WriteError};
17
18mod hello;
19pub(crate) use self::hello::{ClientHello, ServerHello};
20
21pub mod rpc;
22
23pub(crate) mod xmlns;
24
25pub(crate) const MARKER: &[u8] = b"]]>]]>";
26
27pub trait ReadXml: Sized {
28 fn read_xml(reader: &mut NsReader<&[u8]>, start: &BytesStart<'_>) -> Result<Self, ReadError>;
29}
30
31pub trait WriteXml {
32 fn write_xml<W: Write>(&self, writer: &mut Writer<W>) -> Result<(), WriteError>;
33}
34
35#[async_trait]
36pub trait ClientMsg: WriteXml + Debug {
37 #[tracing::instrument(skip(self), level = "debug")]
38 fn to_xml(&self) -> Result<String, WriteError> {
39 let mut buf = Vec::new();
40 let mut writer = Writer::new(&mut buf);
41 self.write_xml(&mut writer)?;
42 buf.extend_from_slice(MARKER);
43 Ok(String::from_utf8(buf)?)
44 }
45
46 #[tracing::instrument(skip(self, sender), level = "trace")]
47 async fn send<T: SendHandle>(&self, sender: &mut T) -> Result<(), Error> {
48 tracing::debug!("sending message");
49 let serialized = self.to_xml()?;
50 tracing::debug!(serialized);
51 sender.send(serialized.into()).await
52 }
53}
54
55#[async_trait]
56pub trait ServerMsg: ReadXml {
57 const TAG_NS: Namespace<'static>;
58 const TAG_NAME: &'static str;
59
60 #[tracing::instrument(skip(input), level = "debug")]
61 fn from_xml<S>(input: S) -> Result<Self, ReadError>
62 where
63 S: AsRef<str> + Debug,
64 {
65 tracing::debug!(input = input.as_ref());
66 let mut reader = NsReader::from_str(input.as_ref());
67 _ = reader.trim_text(true);
68 tracing::debug!("expecting <{}>", Self::TAG_NAME);
69 let mut this = None;
70 loop {
71 match reader.read_resolved_event()? {
72 (ResolveResult::Bound(ns), Event::Start(tag))
73 if ns == Self::TAG_NS
74 && tag.local_name().as_ref() == Self::TAG_NAME.as_bytes() =>
75 {
76 this = Some(Self::read_xml(&mut reader, &tag)?);
77 }
78 (_, Event::Comment(_)) => continue,
79 (_, Event::Eof) => break,
80 (_, Event::Text(txt)) if &*txt == MARKER => break,
81 (ns, event) => {
84 tracing::error!(?event, ?ns, "unexpected xml event");
85 return Err(ReadError::UnexpectedXmlEvent(event.into_owned()));
86 }
87 }
88 }
89 this.ok_or_else(|| ReadError::missing_element(Self::TAG_NAME, Self::TAG_NAME))
90 }
91
92 #[tracing::instrument(skip(receiver), level = "trace")]
93 async fn recv<T: RecvHandle>(receiver: &mut T) -> Result<Self, Error> {
94 tracing::debug!("receiving message");
95 let bytes = receiver.recv().await?;
96 let serialized = from_utf8(&bytes).map_err(ReadError::DecodeMessage)?;
97 Ok(Self::from_xml(serialized)?)
98 }
99}