use std::time::{SystemTime, UNIX_EPOCH};
use crate::channels::wasm::capabilities::{ChannelCapabilities, EmitRateLimitConfig};
use crate::channels::wasm::error::WasmChannelError;
use crate::tools::wasm::{HostState, LogLevel};
const MAX_EMITS_PER_EXECUTION: usize = 100;
const MAX_MESSAGE_CONTENT_SIZE: usize = 64 * 1024;
#[derive(Debug, Clone)]
pub struct EmittedMessage {
pub user_id: String,
pub user_name: Option<String>,
pub content: String,
pub thread_id: Option<String>,
pub metadata_json: String,
pub emitted_at_millis: u64,
}
impl EmittedMessage {
pub fn new(user_id: impl Into<String>, content: impl Into<String>) -> Self {
Self {
user_id: user_id.into(),
user_name: None,
content: content.into(),
thread_id: None,
metadata_json: "{}".to_string(),
emitted_at_millis: SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0),
}
}
pub fn with_user_name(mut self, name: impl Into<String>) -> Self {
self.user_name = Some(name.into());
self
}
pub fn with_thread_id(mut self, thread_id: impl Into<String>) -> Self {
self.thread_id = Some(thread_id.into());
self
}
pub fn with_metadata(mut self, metadata_json: impl Into<String>) -> Self {
self.metadata_json = metadata_json.into();
self
}
}
#[derive(Debug, Clone)]
pub struct PendingWorkspaceWrite {
pub path: String,
pub content: String,
}
pub struct ChannelHostState {
base: HostState,
channel_name: String,
capabilities: ChannelCapabilities,
emitted_messages: Vec<EmittedMessage>,
pending_writes: Vec<PendingWorkspaceWrite>,
emit_count: u32,
emit_enabled: bool,
emits_dropped: usize,
}
impl std::fmt::Debug for ChannelHostState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ChannelHostState")
.field("channel_name", &self.channel_name)
.field("emitted_messages_count", &self.emitted_messages.len())
.field("pending_writes_count", &self.pending_writes.len())
.field("emit_count", &self.emit_count)
.field("emit_enabled", &self.emit_enabled)
.field("emits_dropped", &self.emits_dropped)
.finish()
}
}
impl ChannelHostState {
pub fn new(channel_name: impl Into<String>, capabilities: ChannelCapabilities) -> Self {
let base = HostState::new(capabilities.tool_capabilities.clone());
Self {
base,
channel_name: channel_name.into(),
capabilities,
emitted_messages: Vec::new(),
pending_writes: Vec::new(),
emit_count: 0,
emit_enabled: true,
emits_dropped: 0,
}
}
pub fn channel_name(&self) -> &str {
&self.channel_name
}
pub fn capabilities(&self) -> &ChannelCapabilities {
&self.capabilities
}
pub fn base(&self) -> &HostState {
&self.base
}
pub fn base_mut(&mut self) -> &mut HostState {
&mut self.base
}
pub fn emit_message(&mut self, msg: EmittedMessage) -> Result<(), WasmChannelError> {
if !self.emit_enabled {
self.emits_dropped += 1;
return Ok(()); }
if self.emitted_messages.len() >= MAX_EMITS_PER_EXECUTION {
self.emit_enabled = false;
self.emits_dropped += 1;
tracing::warn!(
channel = %self.channel_name,
limit = MAX_EMITS_PER_EXECUTION,
"Channel emit limit reached, further messages dropped"
);
return Ok(());
}
if msg.content.len() > MAX_MESSAGE_CONTENT_SIZE {
tracing::warn!(
channel = %self.channel_name,
size = msg.content.len(),
max = MAX_MESSAGE_CONTENT_SIZE,
"Message content too large, truncating"
);
let mut truncated = msg.content[..MAX_MESSAGE_CONTENT_SIZE].to_string();
truncated.push_str("... (truncated)");
let msg = EmittedMessage {
content: truncated,
..msg
};
self.emitted_messages.push(msg);
} else {
self.emitted_messages.push(msg);
}
self.emit_count += 1;
Ok(())
}
pub fn take_emitted_messages(&mut self) -> Vec<EmittedMessage> {
std::mem::take(&mut self.emitted_messages)
}
pub fn emitted_count(&self) -> usize {
self.emitted_messages.len()
}
pub fn emits_dropped(&self) -> usize {
self.emits_dropped
}
pub fn workspace_write(&mut self, path: &str, content: String) -> Result<(), WasmChannelError> {
let full_path = self
.capabilities
.validate_workspace_path(path)
.map_err(|reason| WasmChannelError::WorkspaceEscape {
name: self.channel_name.clone(),
path: reason,
})?;
self.pending_writes.push(PendingWorkspaceWrite {
path: full_path,
content,
});
Ok(())
}
pub fn take_pending_writes(&mut self) -> Vec<PendingWorkspaceWrite> {
std::mem::take(&mut self.pending_writes)
}
pub fn pending_writes_count(&self) -> usize {
self.pending_writes.len()
}
pub fn log(
&mut self,
level: LogLevel,
message: String,
) -> Result<(), crate::tools::wasm::WasmError> {
self.base.log(level, message)
}
pub fn now_millis(&self) -> u64 {
self.base.now_millis()
}
pub fn workspace_read(
&self,
path: &str,
) -> Result<Option<String>, crate::tools::wasm::WasmError> {
let full_path = self.capabilities.prefix_workspace_path(path);
self.base.workspace_read(&full_path)
}
pub fn secret_exists(&self, name: &str) -> bool {
self.base.secret_exists(name)
}
pub fn check_http_allowed(&self, url: &str, method: &str) -> Result<(), String> {
self.base.check_http_allowed(url, method)
}
pub fn record_http_request(&mut self) -> Result<(), String> {
self.base.record_http_request()
}
pub fn take_logs(&mut self) -> Vec<crate::tools::wasm::LogEntry> {
self.base.take_logs()
}
}
pub struct ChannelEmitRateLimiter {
config: EmitRateLimitConfig,
minute_window: RateWindow,
hour_window: RateWindow,
}
struct RateWindow {
count: u32,
window_start: u64,
window_duration_ms: u64,
}
impl RateWindow {
fn new(duration_ms: u64) -> Self {
Self {
count: 0,
window_start: 0,
window_duration_ms: duration_ms,
}
}
fn check_and_record(&mut self, now_ms: u64, limit: u32) -> bool {
if now_ms.saturating_sub(self.window_start) > self.window_duration_ms {
self.count = 0;
self.window_start = now_ms;
}
if self.count >= limit {
return false;
}
self.count += 1;
true
}
}
#[allow(dead_code)]
impl ChannelEmitRateLimiter {
pub fn new(config: EmitRateLimitConfig) -> Self {
Self {
config,
minute_window: RateWindow::new(60_000), hour_window: RateWindow::new(3_600_000), }
}
pub fn check_and_record(&mut self) -> bool {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let minute_ok = self
.minute_window
.check_and_record(now, self.config.messages_per_minute);
let hour_ok = self
.hour_window
.check_and_record(now, self.config.messages_per_hour);
minute_ok && hour_ok
}
pub fn minute_count(&self) -> u32 {
self.minute_window.count
}
pub fn hour_count(&self) -> u32 {
self.hour_window.count
}
}
#[cfg(test)]
mod tests {
use crate::channels::wasm::capabilities::{ChannelCapabilities, EmitRateLimitConfig};
use crate::channels::wasm::host::{
ChannelEmitRateLimiter, ChannelHostState, EmittedMessage, MAX_EMITS_PER_EXECUTION,
};
#[test]
fn test_emit_message_basic() {
let caps = ChannelCapabilities::for_channel("test");
let mut state = ChannelHostState::new("test", caps);
let msg = EmittedMessage::new("user123", "Hello, world!");
state.emit_message(msg).unwrap();
assert_eq!(state.emitted_count(), 1);
let messages = state.take_emitted_messages();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].user_id, "user123");
assert_eq!(messages[0].content, "Hello, world!");
assert_eq!(state.emitted_count(), 0);
}
#[test]
fn test_emit_message_with_metadata() {
let caps = ChannelCapabilities::for_channel("test");
let mut state = ChannelHostState::new("test", caps);
let msg = EmittedMessage::new("user123", "Hello")
.with_user_name("John Doe")
.with_thread_id("thread-1")
.with_metadata(r#"{"key": "value"}"#);
state.emit_message(msg).unwrap();
let messages = state.take_emitted_messages();
assert_eq!(messages[0].user_name, Some("John Doe".to_string()));
assert_eq!(messages[0].thread_id, Some("thread-1".to_string()));
assert_eq!(messages[0].metadata_json, r#"{"key": "value"}"#);
}
#[test]
fn test_emit_per_execution_limit() {
let caps = ChannelCapabilities::for_channel("test");
let mut state = ChannelHostState::new("test", caps);
for i in 0..MAX_EMITS_PER_EXECUTION {
let msg = EmittedMessage::new("user", format!("Message {}", i));
state.emit_message(msg).unwrap();
}
let msg = EmittedMessage::new("user", "Should be dropped");
state.emit_message(msg).unwrap();
assert_eq!(state.emitted_count(), MAX_EMITS_PER_EXECUTION);
assert_eq!(state.emits_dropped(), 1);
}
#[test]
fn test_workspace_write_prefixing() {
let caps = ChannelCapabilities::for_channel("slack");
let mut state = ChannelHostState::new("slack", caps);
state
.workspace_write("state.json", "{}".to_string())
.unwrap();
let writes = state.take_pending_writes();
assert_eq!(writes.len(), 1);
assert_eq!(writes[0].path, "channels/slack/state.json");
}
#[test]
fn test_workspace_write_path_traversal_blocked() {
let caps = ChannelCapabilities::for_channel("slack");
let mut state = ChannelHostState::new("slack", caps);
let result = state.workspace_write("../secrets.json", "{}".to_string());
assert!(result.is_err());
let result = state.workspace_write("/etc/passwd", "{}".to_string());
assert!(result.is_err());
}
#[test]
fn test_rate_limiter_basic() {
let config = EmitRateLimitConfig {
messages_per_minute: 10,
messages_per_hour: 100,
};
let mut limiter = ChannelEmitRateLimiter::new(config);
for _ in 0..10 {
assert!(limiter.check_and_record());
}
assert!(!limiter.check_and_record());
}
#[test]
fn test_channel_name() {
let caps = ChannelCapabilities::for_channel("telegram");
let state = ChannelHostState::new("telegram", caps);
assert_eq!(state.channel_name(), "telegram");
}
}