use buffa::view::{OwnedView, ViewReborrow};
use bytes::Bytes;
use serde::Serialize;
use crate::codec::CodecFormat;
use crate::error::ConnectError;
use crate::request::HasMessageView;
use crate::response::Encodable;
pub struct StreamMessage<M: HasMessageView> {
inner: M::ViewHandle,
}
impl<M: HasMessageView> StreamMessage<M> {
#[doc(hidden)]
pub fn from_owned_view(inner: OwnedView<M::View<'static>>) -> Self {
Self {
inner: M::ViewHandle::from(inner),
}
}
#[must_use]
pub fn view<'b>(&'b self) -> &'b M::View<'b>
where
M::View<'static>: ViewReborrow<Reborrowed<'b> = M::View<'b>>,
{
self.inner.as_ref().reborrow()
}
#[must_use]
pub fn to_owned_message(&self) -> M {
self.inner.as_ref().to_owned_message()
}
#[must_use]
pub fn bytes(&self) -> &Bytes {
self.inner.as_ref().bytes()
}
}
impl<M: HasMessageView> core::ops::Deref for StreamMessage<M> {
type Target = M::ViewHandle;
fn deref(&self) -> &M::ViewHandle {
&self.inner
}
}
impl<M: HasMessageView> Clone for StreamMessage<M>
where
M::ViewHandle: Clone,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<M: HasMessageView> core::fmt::Debug for StreamMessage<M>
where
M::ViewHandle: core::fmt::Debug,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
self.inner.fmt(f)
}
}
impl<M> Encodable<M> for StreamMessage<M>
where
M: HasMessageView + Serialize,
{
fn encode(&self, codec: CodecFormat) -> Result<Bytes, ConnectError> {
match codec {
CodecFormat::Proto => Ok(self.inner.as_ref().bytes().clone()),
CodecFormat::Json => serde_json::to_vec(&self.to_owned_message())
.map(Bytes::from)
.map_err(|e| {
ConnectError::internal(format!("failed to encode JSON response: {e}"))
}),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use buffa::Message;
use buffa_types::google::protobuf::StringValue;
fn message(value: &str) -> StreamMessage<StringValue> {
let bytes = Bytes::from(
StringValue {
value: value.into(),
..Default::default()
}
.encode_to_vec(),
);
StreamMessage::from_owned_view(OwnedView::decode(bytes).expect("decode"))
}
#[test]
fn view_to_owned_and_bytes() {
let msg = message("streamed");
assert_eq!(msg.view().value, "streamed");
assert_eq!(msg.to_owned_message().value, "streamed");
let range = msg.bytes().as_ptr_range();
assert!(range.contains(&msg.view().value.as_ptr()));
let cloned = msg.clone();
assert_eq!(format!("{msg:?}"), format!("{cloned:?}"));
}
#[test]
fn encodable_forwards_proto_bytes_without_reencoding() {
let msg = message("forward me");
let original = msg.bytes().clone();
let proto = msg.encode(CodecFormat::Proto).expect("proto encode");
assert_eq!(proto, original);
assert_eq!(proto.as_ptr(), original.as_ptr());
let json = msg.encode(CodecFormat::Json).expect("json encode");
let owned_json = serde_json::to_vec(&msg.to_owned_message()).unwrap();
assert_eq!(json.as_ref(), owned_json.as_slice());
}
}