use buffa::view::{OwnedView, ViewReborrow};
use bytes::Bytes;
use crate::codec::CodecFormat;
use crate::codec::JsonSerialize;
use crate::codec::encode_json;
use crate::error::ConnectError;
use crate::request::HasMessageView;
use crate::response::Encodable;
pub struct StreamMessage<M: HasMessageView> {
inner: M::ViewHandle,
}
impl<M: HasMessageView> StreamMessage<M> {
#[doc(hidden)]
pub fn from_owned_view(inner: OwnedView<M::View<'static>>) -> Self {
Self {
inner: M::ViewHandle::from(inner),
}
}
#[must_use]
pub fn from_message(message: &M) -> Self {
let bytes = Bytes::from(buffa::Message::encode_to_vec(message));
let opts = buffa::DecodeOptions::new()
.with_recursion_limit(u32::MAX)
.with_unknown_field_limit(usize::MAX);
Self::from_owned_view(
OwnedView::decode_with_options(bytes, &opts)
.expect("a just-encoded message always decodes"),
)
}
#[must_use]
pub fn view<'b>(&'b self) -> &'b M::View<'b>
where
M::View<'static>: ViewReborrow<Reborrowed<'b> = M::View<'b>>,
{
self.inner.as_ref().reborrow()
}
#[must_use]
pub fn to_owned_message(&self) -> M {
self.inner
.as_ref()
.to_owned_message()
.expect("wire-decoded view always converts (buffa >= 0.8.1)")
}
#[must_use]
pub fn bytes(&self) -> &Bytes {
self.inner.as_ref().bytes()
}
}
impl<M: HasMessageView> core::ops::Deref for StreamMessage<M> {
type Target = M::ViewHandle;
fn deref(&self) -> &M::ViewHandle {
&self.inner
}
}
impl<M: HasMessageView> Clone for StreamMessage<M>
where
M::ViewHandle: Clone,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<M: HasMessageView> core::fmt::Debug for StreamMessage<M>
where
M::ViewHandle: core::fmt::Debug,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
self.inner.fmt(f)
}
}
impl<M> Encodable<M> for StreamMessage<M>
where
M: HasMessageView + JsonSerialize,
{
fn encode(&self, codec: CodecFormat) -> Result<Bytes, ConnectError> {
match codec {
CodecFormat::Proto => Ok(self.inner.as_ref().bytes().clone()),
CodecFormat::Json => encode_json(&self.to_owned_message()),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use buffa::Message;
use buffa_types::google::protobuf::StringValue;
fn message(value: &str) -> StreamMessage<StringValue> {
let bytes = Bytes::from(
StringValue {
value: value.into(),
..Default::default()
}
.encode_to_vec(),
);
StreamMessage::from_owned_view(OwnedView::decode(bytes).expect("decode"))
}
#[test]
fn view_to_owned_and_bytes() {
let msg = message("streamed");
assert_eq!(msg.view().value, "streamed");
assert_eq!(msg.to_owned_message().value, "streamed");
let range = msg.bytes().as_ptr_range();
assert!(range.contains(&msg.view().value.as_ptr()));
let cloned = msg.clone();
assert_eq!(format!("{msg:?}"), format!("{cloned:?}"));
}
#[cfg(feature = "json")]
#[test]
fn encodable_forwards_proto_bytes_without_reencoding() {
let msg = message("forward me");
let original = msg.bytes().clone();
let proto = msg.encode(CodecFormat::Proto).expect("proto encode");
assert_eq!(proto, original);
assert_eq!(proto.as_ptr(), original.as_ptr());
let json = msg.encode(CodecFormat::Json).expect("json encode");
let owned_json = serde_json::to_vec(&msg.to_owned_message()).unwrap();
assert_eq!(json.as_ref(), owned_json.as_slice());
assert_eq!(json.as_ref(), br#""forward me""#);
}
#[cfg(not(feature = "json"))]
#[test]
fn encode_json_is_unimplemented_without_feature() {
let msg = message("forward me");
assert!(msg.encode(CodecFormat::Proto).is_ok());
let err = msg.encode(CodecFormat::Json).unwrap_err();
assert_eq!(err.code, crate::ErrorCode::Unimplemented);
}
}