1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
use std::task::Poll;
use anyhow::Result;
use chrono::{DateTime, Local};
use futures::{Stream, StreamExt};
use tracing::debug;
use wasmcloud_control_interface::ComponentId;
/// A struct that represents an invocation that was observed by the spier.
#[derive(Debug)]
pub struct ObservedInvocation {
/// The timestamp when this was received
pub timestamp: DateTime<Local>,
/// The name or id of the entity that sent this invocation
pub from: String,
/// The name or id of the entity that received this invocation
pub to: String,
/// The operation that was invoked
pub operation: String,
/// The inner message that was received. We will attempt to parse the inner message from CBOR
/// and JSON into a JSON string and fall back to the raw bytes if we are unable to do so
pub message: ObservedMessage,
}
/// A inner message that we've seen in an invocation message. This will either be a raw bytes or a
/// parsed value if it was a format we recognized.
///
/// Please note that this struct is meant for debugging, so its `Display` implementation does some
/// heavier lifting like contructing strings from the raw bytes.
#[derive(Debug)]
pub enum ObservedMessage {
Raw(Vec<u8>),
Parsed(String),
}
impl std::fmt::Display for ObservedMessage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ObservedMessage::Raw(bytes) => write!(f, "{}", String::from_utf8_lossy(bytes)),
ObservedMessage::Parsed(v) => {
write!(f, "{v}")
}
}
}
}
impl ObservedMessage {
#[must_use]
pub fn parse(data: Vec<u8>) -> Self {
Self::Parsed(String::from_utf8_lossy(&data).to_string())
}
}
/// A struct that can spy on the RPC messages sent to and from an component, consumable as a stream
pub struct Spier {
stream: futures::stream::SelectAll<async_nats::Subscriber>,
component_id: ComponentId,
friendly_name: Option<String>,
}
impl Spier {
/// Creates a new Spier instance for the given component. Will return an error if the component cannot
/// be found or if there are connection issues
pub async fn new(
component_id: &str,
ctl_client: &wasmcloud_control_interface::Client,
nats_client: &async_nats::Client,
) -> Result<Self> {
let linked_component = get_linked_components(component_id, ctl_client).await?;
let lattice = &ctl_client.lattice;
let rpc_topic = format!("{lattice}.{component_id}.wrpc.>");
let component_stream = nats_client.subscribe(rpc_topic).await?;
let mut subs = futures::future::join_all(linked_component.iter().map(|prov| {
let topic = format!("{lattice}.{}.wrpc.>", &prov.id);
nats_client.subscribe(topic)
}))
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
subs.push(component_stream);
let stream = futures::stream::select_all(subs);
Ok(Self {
stream,
component_id: component_id.to_string(),
friendly_name: None,
})
}
/// Returns the component name, or id if no name is set, that this spier is spying on
pub fn component_id(&self) -> &str {
self.friendly_name
.as_deref()
.unwrap_or_else(|| self.component_id.as_ref())
}
}
impl Stream for Spier {
type Item = ObservedInvocation;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match self.stream.poll_next_unpin(cx) {
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(msg)) => {
// <lattice>.<component>.wrpc.0.0.1.<operation>@<versionX.Y.Z>.<function>
let mut subject_parts = msg.subject.split('.');
subject_parts.next(); // Skip the lattice
let component_id = subject_parts.next();
// Skip "wrpc.0.0.1", collect the rest
let operation = subject_parts.skip(4).collect::<Vec<_>>();
// The length assertion is to ensure that at least the `operation.function` is present since the
// version is technically optional.
if component_id.is_none() || operation.len() < 2 {
debug!("Received invocation with invalid subject: {}", msg.subject);
cx.waker().wake_by_ref();
return Poll::Pending;
}
let component_id = component_id.unwrap();
let (from, to) = if component_id == self.component_id {
// Attempt to get the source from the message header
let from = msg
.headers
.and_then(|headers| headers.get("source-id").map(ToString::to_string))
.unwrap_or_else(|| "linked component".to_string());
(from, (*component_id).to_string())
} else {
(self.component_id.to_string(), (*component_id).to_string())
};
// NOTE(thomastaylor312): Ideally we'd consume `msg.payload` above with a
// `Cursor` and `from_reader` and then manually reconstruct the acking using the
// message context, but I didn't want to waste time optimizing yet
Poll::Ready(Some(ObservedInvocation {
timestamp: Local::now(),
from,
to,
operation: operation.join("."),
message: ObservedMessage::parse(msg.payload.to_vec()),
}))
}
Poll::Pending => Poll::Pending,
}
}
}
#[derive(Debug)]
struct ProviderDetails {
id: ComponentId,
}
/// Fetches all components linked to the given component
async fn get_linked_components(
component_id: &str,
ctl_client: &wasmcloud_control_interface::Client,
) -> Result<Vec<ProviderDetails>> {
let details = ctl_client
.get_links()
.await
.map_err(|e| anyhow::anyhow!("Unable to get links: {e:?}"))
.map(|response| response.response)?
.map(|linkdefs| {
linkdefs
.into_iter()
.filter_map(|link| {
if link.source_id == component_id {
Some(ProviderDetails { id: link.target })
} else if link.target == component_id {
Some(ProviderDetails { id: link.source_id })
} else {
None
}
})
.collect::<Vec<_>>()
})
.unwrap_or_default();
Ok(details)
}