1use std::net::{IpAddr, ToSocketAddrs};
24use std::str::FromStr;
25
26use anyhow::{Context, Result, anyhow};
27use rusqlite::{Connection, params};
28use serde::{Deserialize, Serialize};
29use sha2::{Digest, Sha256};
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct Subscription {
34 pub id: String,
35 pub url: String,
36 pub events: String,
37 pub namespace_filter: Option<String>,
38 pub agent_filter: Option<String>,
39 pub created_by: Option<String>,
40 pub created_at: String,
41 pub dispatch_count: i64,
42 pub failure_count: i64,
43 #[serde(default, skip_serializing_if = "Option::is_none")]
49 pub event_types: Option<Vec<String>>,
50}
51
52pub struct NewSubscription<'a> {
54 pub url: &'a str,
55 pub events: &'a str,
56 pub secret: Option<&'a str>,
57 pub namespace_filter: Option<&'a str>,
58 pub agent_filter: Option<&'a str>,
59 pub created_by: Option<&'a str>,
60 pub event_types: Option<&'a [String]>,
65}
66
67pub const WEBHOOK_EVENT_TYPES: &[&str] = &[
71 "memory_store",
72 "memory_promote",
73 "memory_delete",
74 "memory_link_created",
75 "memory_consolidated",
76];
77
78pub fn insert(conn: &Connection, req: &NewSubscription<'_>) -> Result<String> {
89 validate_url(req.url)?;
90 let id = uuid::Uuid::new_v4().to_string();
91 let secret_hash = req.secret.map(sha256_hex);
92 let now = chrono::Utc::now().to_rfc3339();
93
94 let (events_csv, event_types_json) = if let Some(list) = req.event_types {
96 for ev in list {
97 if !WEBHOOK_EVENT_TYPES.contains(&ev.as_str()) {
98 return Err(anyhow!(
99 "unknown webhook event type {ev:?}; valid types: {WEBHOOK_EVENT_TYPES:?}"
100 ));
101 }
102 }
103 let csv = list.join(",");
105 let json = serde_json::to_string(list).context("event_types serialise")?;
106 (csv, Some(json))
107 } else {
108 (req.events.to_string(), None)
109 };
110
111 conn.execute(
112 "INSERT INTO subscriptions (id, url, events, secret_hash, namespace_filter, agent_filter, created_by, created_at, event_types) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
113 params![id, req.url, events_csv, secret_hash, req.namespace_filter, req.agent_filter, req.created_by, now, event_types_json],
114 )?;
115 Ok(id)
116}
117
118pub fn delete(conn: &Connection, id: &str) -> Result<bool> {
120 let n = conn.execute("DELETE FROM subscriptions WHERE id = ?1", params![id])?;
121 Ok(n > 0)
122}
123
124pub fn list(conn: &Connection) -> Result<Vec<Subscription>> {
126 let mut stmt = conn.prepare(
127 "SELECT id, url, events, namespace_filter, agent_filter, created_by, created_at, dispatch_count, failure_count, event_types FROM subscriptions ORDER BY created_at DESC",
128 )?;
129 let rows = stmt.query_map([], |row| {
130 let event_types_raw: Option<String> = row.get(9)?;
131 let event_types =
134 event_types_raw.and_then(|s| match serde_json::from_str::<Vec<String>>(&s) {
135 Ok(v) => Some(v),
136 Err(e) => {
137 tracing::warn!(
138 "subscription event_types JSON decode failed, treating as all-events: {e}"
139 );
140 None
141 }
142 });
143 Ok(Subscription {
144 id: row.get(0)?,
145 url: row.get(1)?,
146 events: row.get(2)?,
147 namespace_filter: row.get(3)?,
148 agent_filter: row.get(4)?,
149 created_by: row.get(5)?,
150 created_at: row.get(6)?,
151 dispatch_count: row.get(7)?,
152 failure_count: row.get(8)?,
153 event_types,
154 })
155 })?;
156 rows.collect::<rusqlite::Result<Vec<_>>>()
157 .context("subscription row decode failed")
158}
159
160pub fn list_by_event(conn: &Connection, event_type: &str) -> Result<Vec<Subscription>> {
170 let pattern = format!("%{event_type}%");
178 let mut stmt = conn.prepare(
179 "SELECT id, url, events, namespace_filter, agent_filter, created_by, created_at, dispatch_count, failure_count, event_types FROM subscriptions WHERE event_types IS NULL OR event_types LIKE ?1 ORDER BY created_at DESC",
180 )?;
181 let rows = stmt.query_map(params![pattern], |row| {
182 let event_types_raw: Option<String> = row.get(9)?;
183 let event_types =
184 event_types_raw.and_then(|s| serde_json::from_str::<Vec<String>>(&s).ok());
185 Ok(Subscription {
186 id: row.get(0)?,
187 url: row.get(1)?,
188 events: row.get(2)?,
189 namespace_filter: row.get(3)?,
190 agent_filter: row.get(4)?,
191 created_by: row.get(5)?,
192 created_at: row.get(6)?,
193 dispatch_count: row.get(7)?,
194 failure_count: row.get(8)?,
195 event_types,
196 })
197 })?;
198 let mut out: Vec<Subscription> = Vec::new();
199 for sub in rows {
200 let s = sub.context("subscription row decode failed")?;
201 match &s.event_types {
202 None => out.push(s),
203 Some(list) if list.iter().any(|e| e == event_type) => out.push(s),
204 Some(_) => {} }
206 }
207 Ok(out)
208}
209
210fn matches_filters(
217 sub_events: &str,
218 sub_event_types: Option<&[String]>,
219 sub_namespace: Option<&str>,
220 sub_agent: Option<&str>,
221 event: &str,
222 namespace: &str,
223 agent: Option<&str>,
224) -> bool {
225 let event_match = if let Some(list) = sub_event_types {
226 list.iter().any(|e| e == event)
230 } else {
231 sub_events == "*"
233 || sub_events
234 .split(',')
235 .map(str::trim)
236 .any(|e| e == event || e == "*")
237 };
238 if !event_match {
239 return false;
240 }
241 if let Some(ns) = sub_namespace
242 && !ns.is_empty()
243 && ns != namespace
244 {
245 return false;
246 }
247 if let Some(filter) = sub_agent
248 && !filter.is_empty()
249 && agent.is_none_or(|a| a != filter)
250 {
251 return false;
252 }
253 true
254}
255
256#[derive(Serialize)]
258struct DispatchPayload<'a> {
259 event: &'a str,
260 memory_id: &'a str,
261 namespace: &'a str,
262 agent_id: Option<&'a str>,
263 delivered_at: String,
264 #[serde(flatten, skip_serializing_if = "Option::is_none")]
269 details: Option<serde_json::Value>,
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize)]
288pub struct PromoteEventDetails {
289 pub mode: String,
292 #[serde(default, skip_serializing_if = "Option::is_none")]
294 pub tier: Option<String>,
295 #[serde(default, skip_serializing_if = "Option::is_none")]
297 pub to_namespace: Option<String>,
298 #[serde(default, skip_serializing_if = "Option::is_none")]
301 pub clone_id: Option<String>,
302}
303
304#[derive(Debug, Clone, Serialize, Deserialize)]
309pub struct DeleteEventDetails {
310 pub title: String,
311 pub tier: String,
312}
313
314#[derive(Debug, Clone, Serialize, Deserialize)]
319pub struct LinkCreatedEventDetails {
320 pub target_id: String,
321 pub relation: String,
322}
323
324#[derive(Debug, Clone, Serialize, Deserialize)]
329pub struct ConsolidatedEventDetails {
330 pub source_ids: Vec<String>,
331 pub source_count: usize,
332}
333
334pub fn dispatch_event(
347 conn: &Connection,
348 event: &str,
349 memory_id: &str,
350 namespace: &str,
351 agent_id: Option<&str>,
352 db_path: &std::path::Path,
353) {
354 dispatch_event_with_details(conn, event, memory_id, namespace, agent_id, db_path, None);
355}
356
357pub fn dispatch_event_with_details(
365 conn: &Connection,
366 event: &str,
367 memory_id: &str,
368 namespace: &str,
369 agent_id: Option<&str>,
370 db_path: &std::path::Path,
371 details: Option<serde_json::Value>,
372) {
373 let subs = match list(conn) {
374 Ok(s) => s,
375 Err(e) => {
376 tracing::warn!("subscription list failed during dispatch: {e}");
377 return;
378 }
379 };
380 let matching: Vec<Subscription> = subs
381 .into_iter()
382 .filter(|s| {
383 matches_filters(
384 &s.events,
385 s.event_types.as_deref(),
386 s.namespace_filter.as_deref(),
387 s.agent_filter.as_deref(),
388 event,
389 namespace,
390 agent_id,
391 )
392 })
393 .collect();
394 if matching.is_empty() {
395 return;
396 }
397 let payload = DispatchPayload {
398 event,
399 memory_id,
400 namespace,
401 agent_id,
402 delivered_at: chrono::Utc::now().to_rfc3339(),
403 details,
404 };
405 let body = match serde_json::to_string(&payload) {
406 Ok(s) => s,
407 Err(e) => {
408 tracing::warn!("dispatch payload serialize failed: {e}");
409 return;
410 }
411 };
412 let timestamp = chrono::Utc::now().timestamp().to_string();
417 for sub in matching {
418 let url = sub.url.clone();
419 let sub_id = sub.id.clone();
420 let body = body.clone();
421 let ts = timestamp.clone();
422 let db_path = db_path.to_path_buf();
423 std::thread::spawn(move || {
424 let secret_hash = match load_secret_hash(&db_path, &sub_id) {
425 Ok(s) => s,
426 Err(e) => {
427 tracing::warn!("subscription secret lookup failed: {e}");
428 return;
429 }
430 };
431 let canonical = format!("{ts}.{body}");
436 let signature = secret_hash
437 .as_deref()
438 .map(|h| hmac_sha256_hex(h, &canonical));
439 let ok = send(&url, &body, &ts, signature.as_deref());
440 record_dispatch(&db_path, &sub_id, ok);
441 });
442 }
443}
444
445fn send(url: &str, body: &str, timestamp: &str, signature: Option<&str>) -> bool {
448 if let Err(e) = validate_url(url) {
449 tracing::warn!("SSRF guard rejected webhook URL {url}: {e}");
450 return false;
451 }
452 if let Err(e) = validate_url_dns(url) {
458 tracing::warn!("DNS SSRF guard rejected webhook URL {url}: {e}");
459 return false;
460 }
461 let client = match reqwest::blocking::Client::builder()
462 .timeout(std::time::Duration::from_secs(10))
463 .build()
464 {
465 Ok(c) => c,
466 Err(e) => {
467 tracing::warn!("webhook client build failed: {e}");
468 return false;
469 }
470 };
471 let mut req = client
472 .post(url)
473 .header("content-type", "application/json")
474 .header("user-agent", "ai-memory/0.6.0.0")
475 .header("x-ai-memory-timestamp", timestamp);
476 if let Some(sig) = signature {
477 req = req.header("x-ai-memory-signature", format!("sha256={sig}"));
478 }
479 match req.body(body.to_string()).send() {
480 Ok(resp) => resp.status().is_success(),
481 Err(e) => {
482 tracing::warn!("webhook POST to {url} failed: {e}");
483 false
484 }
485 }
486}
487
488fn sha256_hex(s: &str) -> String {
490 let mut hasher = Sha256::new();
491 hasher.update(s.as_bytes());
492 format!("{:x}", hasher.finalize())
493}
494
495fn hmac_sha256_hex(key_hex: &str, body: &str) -> String {
500 const BLOCK: usize = 64;
501 let mut key = hex_decode(key_hex).unwrap_or_else(|| key_hex.as_bytes().to_vec());
506 if key.len() > BLOCK {
507 let mut h = Sha256::new();
508 h.update(&key);
509 key = h.finalize().to_vec();
510 }
511 key.resize(BLOCK, 0);
512 let mut opad = [0x5cu8; BLOCK];
513 let mut ipad = [0x36u8; BLOCK];
514 for i in 0..BLOCK {
515 opad[i] ^= key[i];
516 ipad[i] ^= key[i];
517 }
518 let mut inner = Sha256::new();
519 inner.update(ipad);
520 inner.update(body.as_bytes());
521 let inner_digest = inner.finalize();
522 let mut outer = Sha256::new();
523 outer.update(opad);
524 outer.update(inner_digest);
525 format!("{:x}", outer.finalize())
526}
527
528fn hex_decode(s: &str) -> Option<Vec<u8>> {
529 if !s.len().is_multiple_of(2) {
530 return None;
531 }
532 (0..s.len())
533 .step_by(2)
534 .map(|i| u8::from_str_radix(&s[i..i + 2], 16).ok())
535 .collect()
536}
537
538pub fn validate_url_dns(url: &str) -> Result<()> {
548 let lower = url.to_ascii_lowercase();
549 let (_scheme, rest) = lower
550 .split_once("://")
551 .ok_or_else(|| anyhow!("webhook URL missing scheme: {url}"))?;
552 let host_end = rest.find(['/', '?', '#']).unwrap_or(rest.len());
553 let host_port = &rest[..host_end];
554 let resolv_target =
562 if let Some(close_idx) = host_port.strip_prefix('[').and(host_port.find(']')) {
563 let after_bracket = &host_port[close_idx + 1..];
564 if after_bracket.starts_with(':') {
565 host_port.to_string()
567 } else {
568 format!("{host_port}:80")
570 }
571 } else if host_port.contains(':') {
572 host_port.to_string()
574 } else {
575 format!("{host_port}:80")
576 };
577 let addrs: Vec<std::net::SocketAddr> = match resolv_target.to_socket_addrs() {
578 Ok(iter) => iter.collect(),
579 Err(_) => return Ok(()), };
581 for addr in &addrs {
582 let ip = addr.ip();
583 if is_private(ip) && !ip.is_loopback() {
584 return Err(anyhow!(
585 "host resolves to private/link-local IP {ip}: {url}"
586 ));
587 }
588 }
589 Ok(())
590}
591
592pub fn validate_url(url: &str) -> Result<()> {
596 let lower = url.to_ascii_lowercase();
598 let (scheme, rest) = lower
599 .split_once("://")
600 .ok_or_else(|| anyhow!("webhook URL missing scheme: {url}"))?;
601 if scheme != "https" && scheme != "http" {
602 return Err(anyhow!("webhook URL scheme must be http(s): {url}"));
603 }
604 let host_end = rest.find(['/', '?', '#']).unwrap_or(rest.len());
608 let host_port = &rest[..host_end];
609 let host: String = if let Some(stripped) = host_port.strip_prefix('[') {
610 match stripped.find(']') {
612 Some(i) => stripped[..i].to_string(),
613 None => return Err(anyhow!("malformed IPv6 URL host: {url}")),
614 }
615 } else {
616 host_port
618 .rsplit_once(':')
619 .map_or(host_port.to_string(), |(h, _)| h.to_string())
620 };
621 let host = host.as_str();
622 let is_loopback_hostname = matches!(host, "localhost" | "localhost.localdomain" | "");
624 if scheme == "http" && !is_loopback_hostname {
625 if let Ok(ip) = IpAddr::from_str(host) {
628 if !ip.is_loopback() {
629 return Err(anyhow!(
630 "webhook URL must be https for non-loopback host: {url}"
631 ));
632 }
633 } else {
634 return Err(anyhow!(
635 "webhook URL must be https for non-loopback host: {url}"
636 ));
637 }
638 }
639 if let Ok(ip) = IpAddr::from_str(host)
645 && is_private(ip)
646 && !ip.is_loopback()
647 {
648 return Err(anyhow!(
649 "webhook URL targets private / link-local address: {url}"
650 ));
651 }
652 Ok(())
653}
654
655fn is_private(ip: IpAddr) -> bool {
656 match ip {
657 IpAddr::V4(v4) => {
658 v4.is_private()
663 || v4.is_link_local()
664 || v4.is_multicast()
665 || v4.is_broadcast()
666 || v4.is_unspecified()
667 }
668 IpAddr::V6(v6) => {
669 let segs = v6.segments();
674 v6.is_multicast()
675 || v6.is_unspecified()
676 || (segs[0] & 0xfe00) == 0xfc00 || (segs[0] & 0xffc0) == 0xfe80 }
679 }
680}
681
682fn load_secret_hash(db_path: &std::path::Path, sub_id: &str) -> Result<Option<String>> {
683 let conn = Connection::open(db_path).context("load_secret_hash open")?;
684 let row = conn
685 .query_row(
686 "SELECT secret_hash FROM subscriptions WHERE id = ?1",
687 params![sub_id],
688 |r| r.get::<_, Option<String>>(0),
689 )
690 .context("load_secret_hash query")?;
691 Ok(row)
692}
693
694fn record_dispatch(db_path: &std::path::Path, sub_id: &str, ok: bool) {
695 let Ok(conn) = Connection::open(db_path) else {
696 return;
697 };
698 let now = chrono::Utc::now().to_rfc3339();
699 let sql = if ok {
700 "UPDATE subscriptions SET dispatch_count = dispatch_count + 1, last_dispatched_at = ?1 WHERE id = ?2"
701 } else {
702 "UPDATE subscriptions SET dispatch_count = dispatch_count + 1, failure_count = failure_count + 1, last_dispatched_at = ?1 WHERE id = ?2"
703 };
704 let _ = conn.execute(sql, params![now, sub_id]);
705}
706
707#[cfg(test)]
708mod tests {
709 use super::*;
710
711 #[test]
712 fn https_allowed() {
713 assert!(validate_url("https://example.com/hook").is_ok());
714 assert!(validate_url("https://api.example.com:8443/hook?x=1").is_ok());
715 }
716
717 #[test]
718 fn http_only_to_loopback() {
719 assert!(validate_url("http://localhost/hook").is_ok());
720 assert!(validate_url("http://127.0.0.1:8080/hook").is_ok());
721 assert!(validate_url("http://[::1]/hook").is_ok());
723 assert!(validate_url("http://example.com/hook").is_err());
724 assert!(validate_url("http://8.8.8.8/hook").is_err());
725 }
726
727 #[test]
728 fn private_ranges_blocked() {
729 assert!(validate_url("https://10.0.0.1/hook").is_err());
730 assert!(validate_url("https://192.168.1.1/hook").is_err());
731 assert!(validate_url("https://172.16.0.1/hook").is_err());
732 assert!(validate_url("https://169.254.1.1/hook").is_err());
733 assert!(validate_url("https://[fc00::1]/hook").is_err());
734 assert!(validate_url("https://[fe80::1]/hook").is_err());
735 }
736
737 #[test]
738 fn nonsense_rejected() {
739 assert!(validate_url("ftp://example.com").is_err());
740 assert!(validate_url("notaurl").is_err());
741 assert!(validate_url("").is_err());
742 }
743
744 #[test]
745 fn hmac_sha256_stable() {
746 let key = hex::encode_fallback("key".as_bytes());
749 let got = hmac_sha256_hex(&key, "The quick brown fox jumps over the lazy dog");
750 assert_eq!(
751 got,
752 "f7bc83f430538424b13298e6aa6fb143ef4d59a14946175997479dbc2d1a3cd8"
753 );
754 }
755
756 #[test]
757 fn filter_wildcards() {
758 assert!(matches_filters(
759 "*",
760 None,
761 None,
762 None,
763 "memory_store",
764 "ns",
765 None
766 ));
767 assert!(matches_filters(
768 "memory_store,memory_delete",
769 None,
770 None,
771 None,
772 "memory_store",
773 "ns",
774 None
775 ));
776 assert!(!matches_filters(
777 "memory_delete",
778 None,
779 None,
780 None,
781 "memory_store",
782 "ns",
783 None
784 ));
785 assert!(matches_filters(
786 "*",
787 None,
788 Some("foo"),
789 None,
790 "memory_store",
791 "foo",
792 None
793 ));
794 assert!(!matches_filters(
795 "*",
796 None,
797 Some("foo"),
798 None,
799 "memory_store",
800 "bar",
801 None
802 ));
803 assert!(matches_filters(
804 "*",
805 None,
806 None,
807 Some("alice"),
808 "memory_store",
809 "ns",
810 Some("alice")
811 ));
812 assert!(!matches_filters(
813 "*",
814 None,
815 None,
816 Some("alice"),
817 "memory_store",
818 "ns",
819 Some("bob")
820 ));
821 }
822
823 #[test]
824 fn filter_event_types_overrides_legacy_events() {
825 let opt_in_store_only: Vec<String> = vec!["memory_store".to_string()];
828 assert!(matches_filters(
831 "*",
832 Some(&opt_in_store_only),
833 None,
834 None,
835 "memory_store",
836 "ns",
837 None
838 ));
839 assert!(!matches_filters(
840 "*",
841 Some(&opt_in_store_only),
842 None,
843 None,
844 "memory_delete",
845 "ns",
846 None
847 ));
848 let multi: Vec<String> = vec![
850 "memory_promote".to_string(),
851 "memory_link_created".to_string(),
852 ];
853 assert!(matches_filters(
854 "memory_store",
855 Some(&multi),
856 None,
857 None,
858 "memory_promote",
859 "ns",
860 None
861 ));
862 assert!(!matches_filters(
863 "memory_store",
864 Some(&multi),
865 None,
866 None,
867 "memory_store",
868 "ns",
869 None
870 ));
871 let empty: Vec<String> = vec![];
873 assert!(!matches_filters(
874 "*",
875 Some(&empty),
876 None,
877 None,
878 "memory_store",
879 "ns",
880 None
881 ));
882 }
883
884 #[test]
907 fn test_validate_url_dns_accepts_loopback_v4() {
908 assert!(
913 validate_url_dns("http://127.0.0.1/foo").is_ok(),
914 "127.0.0.1 should be accepted by validate_url_dns (dev/CI)"
915 );
916 assert!(
917 validate_url_dns("http://127.0.0.1:8080/").is_ok(),
918 "127.0.0.1:8080 should be accepted by validate_url_dns"
919 );
920 assert!(
921 validate_url_dns("http://localhost/").is_ok(),
922 "localhost should be accepted by validate_url_dns"
923 );
924 }
925
926 #[test]
927 fn test_validate_url_dns_accepts_loopback_v6() {
928 assert!(
930 validate_url_dns("http://[::1]/").is_ok(),
931 "[::1] should be accepted by validate_url_dns"
932 );
933 assert!(
934 validate_url_dns("http://[0:0:0:0:0:0:0:1]/").is_ok(),
935 "[::1] expanded form should be accepted"
936 );
937 }
938
939 #[test]
940 fn test_validate_url_dns_rejects_link_local_ipv6() {
941 let res = validate_url_dns("http://[fe80::1]/");
947 assert!(
948 res.is_err(),
949 "fe80::1 must be rejected as link-local IPv6, got {res:?}"
950 );
951 }
952
953 #[test]
954 fn test_validate_url_dns_rejects_aws_metadata() {
955 let res = validate_url_dns("http://169.254.169.254/latest/meta-data/");
959 assert!(
960 res.is_err(),
961 "AWS metadata IP must be rejected, got {res:?}"
962 );
963 }
964
965 #[test]
966 fn test_validate_url_dns_rejects_rfc1918_private_ranges() {
967 for url in [
971 "http://10.0.0.1/",
972 "http://172.16.0.1/",
973 "http://172.31.255.255/",
974 "http://192.168.1.1/",
975 ] {
976 let res = validate_url_dns(url);
977 assert!(
978 res.is_err(),
979 "{url} must be rejected as RFC1918, got {res:?}"
980 );
981 }
982 }
983
984 #[test]
985 fn test_validate_url_dns_accepts_public_ip_or_dns() {
986 assert!(
991 validate_url_dns("https://1.1.1.1/").is_ok(),
992 "public IP literal must be accepted"
993 );
994 assert!(
998 validate_url_dns("https://example.com/").is_ok(),
999 "public hostname must be accepted (or DNS-skip path returns Ok)"
1000 );
1001 }
1002
1003 #[test]
1004 fn test_validate_url_dns_rejects_unspecified_addresses() {
1005 let v4 = validate_url_dns("http://0.0.0.0/");
1011 let v6 = validate_url_dns("http://[::]/");
1012 assert!(
1013 v4.is_err(),
1014 "0.0.0.0 should be rejected as unspecified, got {v4:?}"
1015 );
1016 assert!(
1017 v6.is_err(),
1018 "[::] should be rejected as unspecified, got {v6:?}"
1019 );
1020 }
1021
1022 #[test]
1023 fn test_validate_url_dns_missing_scheme() {
1024 let res = validate_url_dns("not-a-url");
1026 assert!(res.is_err(), "missing scheme must Err, got {res:?}");
1027 }
1028
1029 use tempfile::NamedTempFile;
1049
1050 fn fresh_db() -> (NamedTempFile, std::path::PathBuf) {
1054 let f = NamedTempFile::new().expect("tempfile");
1055 let p = f.path().to_path_buf();
1056 let _ = crate::db::open(&p).expect("db::open");
1058 (f, p)
1059 }
1060
1061 #[test]
1064 fn insert_persists_and_list_returns_row() {
1065 let (_keep, path) = fresh_db();
1066 let conn = Connection::open(&path).unwrap();
1067 let id = insert(
1068 &conn,
1069 &NewSubscription {
1070 url: "https://example.com/hook",
1071 events: "memory_store",
1072 secret: Some("s3cret"),
1073 namespace_filter: Some("ns1"),
1074 agent_filter: Some("alice"),
1075 created_by: Some("op"),
1076 event_types: None,
1077 },
1078 )
1079 .unwrap();
1080 assert!(!id.is_empty());
1081
1082 let subs = list(&conn).unwrap();
1083 assert_eq!(subs.len(), 1);
1084 let s = &subs[0];
1085 assert_eq!(s.id, id);
1086 assert_eq!(s.url, "https://example.com/hook");
1087 assert_eq!(s.events, "memory_store");
1088 assert_eq!(s.namespace_filter.as_deref(), Some("ns1"));
1089 assert_eq!(s.agent_filter.as_deref(), Some("alice"));
1090 assert_eq!(s.created_by.as_deref(), Some("op"));
1091 assert_eq!(s.dispatch_count, 0);
1092 assert_eq!(s.failure_count, 0);
1093 }
1094
1095 #[test]
1096 fn insert_rejects_invalid_url() {
1097 let (_keep, path) = fresh_db();
1098 let conn = Connection::open(&path).unwrap();
1099 let res = insert(
1100 &conn,
1101 &NewSubscription {
1102 url: "not-a-url",
1103 events: "*",
1104 secret: None,
1105 namespace_filter: None,
1106 agent_filter: None,
1107 created_by: None,
1108 event_types: None,
1109 },
1110 );
1111 assert!(res.is_err(), "insert must reject invalid URL");
1112 }
1113
1114 #[test]
1115 fn insert_hashes_secret_before_persisting() {
1116 let (_keep, path) = fresh_db();
1117 let conn = Connection::open(&path).unwrap();
1118 let plaintext = "super-shared-secret";
1119 let id = insert(
1120 &conn,
1121 &NewSubscription {
1122 url: "https://example.com/h",
1123 events: "*",
1124 secret: Some(plaintext),
1125 namespace_filter: None,
1126 agent_filter: None,
1127 created_by: None,
1128 event_types: None,
1129 },
1130 )
1131 .unwrap();
1132 let stored: Option<String> = conn
1133 .query_row(
1134 "SELECT secret_hash FROM subscriptions WHERE id = ?1",
1135 params![id],
1136 |r| r.get(0),
1137 )
1138 .unwrap();
1139 let hash = stored.expect("secret_hash should be set");
1140 assert_ne!(hash, plaintext, "plaintext secret must not be stored");
1141 assert_eq!(hash, sha256_hex(plaintext));
1142 }
1143
1144 #[test]
1145 fn insert_no_secret_stores_null() {
1146 let (_keep, path) = fresh_db();
1147 let conn = Connection::open(&path).unwrap();
1148 let id = insert(
1149 &conn,
1150 &NewSubscription {
1151 url: "https://example.com/h",
1152 events: "*",
1153 secret: None,
1154 namespace_filter: None,
1155 agent_filter: None,
1156 created_by: None,
1157 event_types: None,
1158 },
1159 )
1160 .unwrap();
1161 let stored: Option<String> = conn
1162 .query_row(
1163 "SELECT secret_hash FROM subscriptions WHERE id = ?1",
1164 params![id],
1165 |r| r.get(0),
1166 )
1167 .unwrap();
1168 assert!(stored.is_none(), "missing secret must persist as NULL");
1169 }
1170
1171 #[test]
1172 fn delete_returns_true_when_row_removed() {
1173 let (_keep, path) = fresh_db();
1174 let conn = Connection::open(&path).unwrap();
1175 let id = insert(
1176 &conn,
1177 &NewSubscription {
1178 url: "https://example.com/h",
1179 events: "*",
1180 secret: None,
1181 namespace_filter: None,
1182 agent_filter: None,
1183 created_by: None,
1184 event_types: None,
1185 },
1186 )
1187 .unwrap();
1188 assert!(delete(&conn, &id).unwrap());
1189 assert!(list(&conn).unwrap().is_empty());
1190 }
1191
1192 #[test]
1193 fn delete_returns_false_when_row_missing() {
1194 let (_keep, path) = fresh_db();
1195 let conn = Connection::open(&path).unwrap();
1196 assert!(!delete(&conn, "nope").unwrap());
1197 }
1198
1199 #[test]
1200 fn list_orders_by_created_at_desc() {
1201 let (_keep, path) = fresh_db();
1202 let conn = Connection::open(&path).unwrap();
1203 let id1 = insert(
1206 &conn,
1207 &NewSubscription {
1208 url: "https://a.example.com/",
1209 events: "*",
1210 secret: None,
1211 namespace_filter: None,
1212 agent_filter: None,
1213 created_by: None,
1214 event_types: None,
1215 },
1216 )
1217 .unwrap();
1218 std::thread::sleep(std::time::Duration::from_millis(1100));
1219 let id2 = insert(
1220 &conn,
1221 &NewSubscription {
1222 url: "https://b.example.com/",
1223 events: "*",
1224 secret: None,
1225 namespace_filter: None,
1226 agent_filter: None,
1227 created_by: None,
1228 event_types: None,
1229 },
1230 )
1231 .unwrap();
1232 let subs = list(&conn).unwrap();
1233 assert_eq!(subs.len(), 2);
1234 assert_eq!(subs[0].id, id2);
1236 assert_eq!(subs[1].id, id1);
1237 }
1238
1239 #[test]
1242 fn sha256_hex_known_vector() {
1243 assert_eq!(
1245 sha256_hex(""),
1246 "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
1247 );
1248 assert_eq!(
1250 sha256_hex("abc"),
1251 "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad"
1252 );
1253 }
1254
1255 #[test]
1256 fn hex_decode_round_trip_and_invalid() {
1257 let s = "deadbeef";
1259 let bytes = hex_decode(s).expect("valid hex");
1260 assert_eq!(bytes, vec![0xde, 0xad, 0xbe, 0xef]);
1261 assert!(hex_decode("abc").is_none());
1263 assert!(hex_decode("zz").is_none());
1265 }
1266
1267 #[test]
1268 fn hmac_long_key_is_hashed_to_fit_block() {
1269 let long_key: String = std::iter::repeat_n('a', 200).collect();
1274 let sig = hmac_sha256_hex(&long_key, "hello");
1275 assert_eq!(sig.len(), 64); }
1277
1278 #[test]
1279 fn hmac_invalid_hex_key_falls_back_to_raw_bytes() {
1280 let sig = hmac_sha256_hex("not-a-hex-key!!", "hello");
1284 assert_eq!(sig.len(), 64);
1285 assert!(sig.chars().all(|c| c.is_ascii_hexdigit()));
1286 }
1287
1288 #[test]
1291 fn matches_filters_event_with_whitespace_and_star() {
1292 assert!(matches_filters(
1294 "memory_store, *",
1295 None,
1296 None,
1297 None,
1298 "anything",
1299 "ns",
1300 None,
1301 ));
1302 assert!(matches_filters(
1304 " memory_delete , memory_store ",
1305 None,
1306 None,
1307 None,
1308 "memory_store",
1309 "ns",
1310 None,
1311 ));
1312 }
1313
1314 #[test]
1315 fn matches_filters_agent_filter_requires_some() {
1316 assert!(!matches_filters(
1318 "*",
1319 None,
1320 None,
1321 Some("alice"),
1322 "memory_store",
1323 "ns",
1324 None,
1325 ));
1326 }
1327
1328 #[test]
1331 fn record_dispatch_increments_counts_on_success() {
1332 let (_keep, path) = fresh_db();
1333 let id = {
1334 let conn = Connection::open(&path).unwrap();
1335 insert(
1336 &conn,
1337 &NewSubscription {
1338 url: "https://example.com/h",
1339 events: "*",
1340 secret: None,
1341 namespace_filter: None,
1342 agent_filter: None,
1343 created_by: None,
1344 event_types: None,
1345 },
1346 )
1347 .unwrap()
1348 };
1349 record_dispatch(&path, &id, true);
1350 record_dispatch(&path, &id, true);
1351 let conn = Connection::open(&path).unwrap();
1352 let (dc, fc): (i64, i64) = conn
1353 .query_row(
1354 "SELECT dispatch_count, failure_count FROM subscriptions WHERE id = ?1",
1355 params![id],
1356 |r| Ok((r.get(0)?, r.get(1)?)),
1357 )
1358 .unwrap();
1359 assert_eq!(dc, 2, "two successful dispatches must bump dispatch_count");
1360 assert_eq!(fc, 0, "successes must not bump failure_count");
1361 }
1362
1363 #[test]
1364 fn record_dispatch_increments_failure_on_err() {
1365 let (_keep, path) = fresh_db();
1366 let id = {
1367 let conn = Connection::open(&path).unwrap();
1368 insert(
1369 &conn,
1370 &NewSubscription {
1371 url: "https://example.com/h",
1372 events: "*",
1373 secret: None,
1374 namespace_filter: None,
1375 agent_filter: None,
1376 created_by: None,
1377 event_types: None,
1378 },
1379 )
1380 .unwrap()
1381 };
1382 record_dispatch(&path, &id, false);
1383 let conn = Connection::open(&path).unwrap();
1384 let (dc, fc): (i64, i64) = conn
1385 .query_row(
1386 "SELECT dispatch_count, failure_count FROM subscriptions WHERE id = ?1",
1387 params![id],
1388 |r| Ok((r.get(0)?, r.get(1)?)),
1389 )
1390 .unwrap();
1391 assert_eq!(dc, 1, "failed dispatch still bumps dispatch_count");
1392 assert_eq!(fc, 1, "failure must bump failure_count");
1393 }
1394
1395 #[test]
1396 fn record_dispatch_nonexistent_id_does_not_panic() {
1397 let (_keep, path) = fresh_db();
1398 record_dispatch(&path, "no-such-id", true);
1401 record_dispatch(&path, "no-such-id", false);
1402 let conn = Connection::open(&path).unwrap();
1404 let n: i64 = conn
1405 .query_row("SELECT COUNT(*) FROM subscriptions", [], |r| r.get(0))
1406 .unwrap();
1407 assert_eq!(n, 0);
1408 }
1409
1410 #[test]
1411 fn record_dispatch_unopenable_db_path_is_noop() {
1412 let bad = std::path::PathBuf::from("/nonexistent-dir-w12c/does-not-exist.db");
1416 record_dispatch(&bad, "x", true);
1417 }
1418
1419 #[test]
1420 fn load_secret_hash_returns_stored_hash() {
1421 let (_keep, path) = fresh_db();
1422 let id = {
1423 let conn = Connection::open(&path).unwrap();
1424 insert(
1425 &conn,
1426 &NewSubscription {
1427 url: "https://example.com/h",
1428 events: "*",
1429 secret: Some("topsecret"),
1430 namespace_filter: None,
1431 agent_filter: None,
1432 created_by: None,
1433 event_types: None,
1434 },
1435 )
1436 .unwrap()
1437 };
1438 let got = load_secret_hash(&path, &id).unwrap();
1439 assert_eq!(got, Some(sha256_hex("topsecret")));
1440 }
1441
1442 #[test]
1443 fn load_secret_hash_missing_id_errs() {
1444 let (_keep, path) = fresh_db();
1445 let res = load_secret_hash(&path, "missing-id");
1448 assert!(res.is_err(), "missing subscription id must surface as Err");
1449 }
1450
1451 #[test]
1454 fn dispatch_event_no_subs_is_noop() {
1455 let (_keep, path) = fresh_db();
1456 let conn = Connection::open(&path).unwrap();
1457 dispatch_event(&conn, "memory_store", "m1", "ns", None, &path);
1460 }
1461
1462 #[test]
1463 fn dispatch_event_filter_mismatch_skips_send() {
1464 let (_keep, path) = fresh_db();
1471 let conn = Connection::open(&path).unwrap();
1472 insert(
1473 &conn,
1474 &NewSubscription {
1475 url: "https://example.com/h",
1476 events: "memory_delete",
1477 secret: None,
1478 namespace_filter: None,
1479 agent_filter: None,
1480 created_by: None,
1481 event_types: None,
1482 },
1483 )
1484 .unwrap();
1485 dispatch_event(&conn, "memory_store", "m1", "ns", None, &path);
1486 let (dc, fc): (i64, i64) = conn
1488 .query_row(
1489 "SELECT dispatch_count, failure_count FROM subscriptions",
1490 [],
1491 |r| Ok((r.get(0)?, r.get(1)?)),
1492 )
1493 .unwrap();
1494 assert_eq!(dc, 0);
1495 assert_eq!(fc, 0);
1496 }
1497
1498 #[test]
1499 fn dispatch_event_namespace_filter_mismatch_skips() {
1500 let (_keep, path) = fresh_db();
1501 let conn = Connection::open(&path).unwrap();
1502 insert(
1503 &conn,
1504 &NewSubscription {
1505 url: "https://example.com/h",
1506 events: "*",
1507 secret: None,
1508 namespace_filter: Some("only-this-ns"),
1509 agent_filter: None,
1510 created_by: None,
1511 event_types: None,
1512 },
1513 )
1514 .unwrap();
1515 dispatch_event(&conn, "memory_store", "m1", "other-ns", None, &path);
1517 let (dc, fc): (i64, i64) = conn
1518 .query_row(
1519 "SELECT dispatch_count, failure_count FROM subscriptions",
1520 [],
1521 |r| Ok((r.get(0)?, r.get(1)?)),
1522 )
1523 .unwrap();
1524 assert_eq!(dc, 0);
1525 assert_eq!(fc, 0);
1526 }
1527
1528 #[tokio::test(flavor = "multi_thread")]
1531 async fn send_returns_true_on_2xx() {
1532 use wiremock::matchers::{method, path};
1533 use wiremock::{Mock, MockServer, ResponseTemplate};
1534 let server = MockServer::start().await;
1535 Mock::given(method("POST"))
1536 .and(path("/hook"))
1537 .respond_with(ResponseTemplate::new(200))
1538 .expect(1)
1539 .mount(&server)
1540 .await;
1541 let url = format!("{}/hook", server.uri());
1542 let ok = tokio::task::spawn_blocking(move || {
1543 send(&url, "{\"event\":\"x\"}", "1700000000", Some("deadbeef"))
1544 })
1545 .await
1546 .unwrap();
1547 assert!(ok, "2xx must return true");
1548 }
1549
1550 #[tokio::test(flavor = "multi_thread")]
1551 async fn send_returns_false_on_5xx() {
1552 use wiremock::matchers::{method, path};
1553 use wiremock::{Mock, MockServer, ResponseTemplate};
1554 let server = MockServer::start().await;
1555 Mock::given(method("POST"))
1556 .and(path("/hook"))
1557 .respond_with(ResponseTemplate::new(500))
1558 .mount(&server)
1559 .await;
1560 let url = format!("{}/hook", server.uri());
1561 let ok = tokio::task::spawn_blocking(move || {
1562 send(&url, "{\"event\":\"x\"}", "1700000000", None)
1563 })
1564 .await
1565 .unwrap();
1566 assert!(!ok, "5xx must return false (no retry inside send)");
1567 }
1568
1569 #[tokio::test(flavor = "multi_thread")]
1570 async fn send_returns_false_on_4xx() {
1571 use wiremock::matchers::{method, path};
1572 use wiremock::{Mock, MockServer, ResponseTemplate};
1573 let server = MockServer::start().await;
1574 Mock::given(method("POST"))
1575 .and(path("/hook"))
1576 .respond_with(ResponseTemplate::new(404))
1577 .mount(&server)
1578 .await;
1579 let url = format!("{}/hook", server.uri());
1580 let ok = tokio::task::spawn_blocking(move || send(&url, "{}", "1700000000", None))
1581 .await
1582 .unwrap();
1583 assert!(!ok, "4xx must return false");
1584 }
1585
1586 #[tokio::test(flavor = "multi_thread")]
1587 async fn send_signature_header_set_when_provided() {
1588 use wiremock::matchers::{header, header_exists, method, path};
1589 use wiremock::{Mock, MockServer, ResponseTemplate};
1590 let server = MockServer::start().await;
1591 Mock::given(method("POST"))
1594 .and(path("/hook"))
1595 .and(header("x-ai-memory-signature", "sha256=abc123"))
1596 .and(header_exists("x-ai-memory-timestamp"))
1597 .and(header("content-type", "application/json"))
1598 .respond_with(ResponseTemplate::new(204))
1599 .expect(1)
1600 .mount(&server)
1601 .await;
1602 let url = format!("{}/hook", server.uri());
1603 let ok =
1604 tokio::task::spawn_blocking(move || send(&url, "{}", "1700000000", Some("abc123")))
1605 .await
1606 .unwrap();
1607 assert!(ok, "2xx with matched signature header must succeed");
1608 }
1609
1610 #[tokio::test(flavor = "multi_thread")]
1611 async fn send_no_signature_header_when_secret_absent() {
1612 use wiremock::matchers::{method, path};
1613 use wiremock::{Mock, MockServer, Request, ResponseTemplate};
1614 let server = MockServer::start().await;
1615 Mock::given(method("POST"))
1616 .and(path("/hook"))
1617 .respond_with(ResponseTemplate::new(202))
1618 .mount(&server)
1619 .await;
1620 let url = format!("{}/hook", server.uri());
1621 let ok = tokio::task::spawn_blocking({
1622 let url = url.clone();
1623 move || send(&url, "{}", "1700000000", None)
1624 })
1625 .await
1626 .unwrap();
1627 assert!(ok);
1628 let received: Vec<Request> = server.received_requests().await.unwrap_or_default();
1630 assert_eq!(received.len(), 1);
1631 let req = &received[0];
1632 assert!(
1634 req.headers.get("x-ai-memory-signature").is_none(),
1635 "no signature should be sent when secret absent"
1636 );
1637 assert!(
1638 req.headers.get("x-ai-memory-timestamp").is_some(),
1639 "timestamp header must always be set"
1640 );
1641 }
1642
1643 #[test]
1644 fn send_rejects_ssrf_url_without_network() {
1645 let ok = send("https://10.0.0.1/hook", "{}", "1700000000", None);
1649 assert!(!ok, "send must reject SSRF URL via validate_url guard");
1650 }
1651
1652 #[test]
1653 fn send_rejects_invalid_scheme_without_network() {
1654 let ok = send("ftp://example.com/hook", "{}", "1700000000", None);
1656 assert!(!ok, "send must reject non-http(s) URL");
1657 }
1658
1659 #[tokio::test(flavor = "multi_thread")]
1662 async fn dispatch_event_e2e_increments_dispatch_count_on_2xx() {
1663 use wiremock::matchers::{method, path};
1664 use wiremock::{Mock, MockServer, ResponseTemplate};
1665 let server = MockServer::start().await;
1666 Mock::given(method("POST"))
1667 .and(path("/hook"))
1668 .respond_with(ResponseTemplate::new(200))
1669 .mount(&server)
1670 .await;
1671
1672 let (_keep, db_path) = fresh_db();
1673 let id = {
1675 let conn = Connection::open(&db_path).unwrap();
1676 let url = format!("{}/hook", server.uri());
1677 insert(
1678 &conn,
1679 &NewSubscription {
1680 url: &url,
1681 events: "*",
1682 secret: Some("mysecret"),
1683 namespace_filter: None,
1684 agent_filter: None,
1685 created_by: None,
1686 event_types: None,
1687 },
1688 )
1689 .unwrap()
1690 };
1691
1692 {
1696 let conn = Connection::open(&db_path).unwrap();
1697 dispatch_event(&conn, "memory_store", "m1", "ns", None, &db_path);
1698 }
1699
1700 let path_for_poll = db_path.clone();
1701 let id_for_poll = id.clone();
1702 let dc = tokio::task::spawn_blocking(move || {
1703 for _ in 0..50 {
1704 let conn = Connection::open(&path_for_poll).unwrap();
1705 let dc: i64 = conn
1706 .query_row(
1707 "SELECT dispatch_count FROM subscriptions WHERE id = ?1",
1708 params![id_for_poll],
1709 |r| r.get(0),
1710 )
1711 .unwrap();
1712 if dc > 0 {
1713 return dc;
1714 }
1715 std::thread::sleep(std::time::Duration::from_millis(100));
1716 }
1717 0
1718 })
1719 .await
1720 .unwrap();
1721 assert_eq!(dc, 1, "successful dispatch must increment dispatch_count");
1722 }
1723
1724 #[tokio::test(flavor = "multi_thread")]
1725 async fn dispatch_event_e2e_increments_failure_count_on_5xx() {
1726 use wiremock::matchers::{method, path};
1727 use wiremock::{Mock, MockServer, ResponseTemplate};
1728 let server = MockServer::start().await;
1729 Mock::given(method("POST"))
1730 .and(path("/hook"))
1731 .respond_with(ResponseTemplate::new(500))
1732 .mount(&server)
1733 .await;
1734
1735 let (_keep, db_path) = fresh_db();
1736 let id = {
1737 let conn = Connection::open(&db_path).unwrap();
1738 let url = format!("{}/hook", server.uri());
1739 insert(
1740 &conn,
1741 &NewSubscription {
1742 url: &url,
1743 events: "*",
1744 secret: None,
1745 namespace_filter: None,
1746 agent_filter: None,
1747 created_by: None,
1748 event_types: None,
1749 },
1750 )
1751 .unwrap()
1752 };
1753
1754 {
1755 let conn = Connection::open(&db_path).unwrap();
1756 dispatch_event(&conn, "memory_store", "m2", "ns", None, &db_path);
1757 }
1758
1759 let path_for_poll = db_path.clone();
1760 let id_for_poll = id.clone();
1761 let (dc, fc) = tokio::task::spawn_blocking(move || {
1762 for _ in 0..50 {
1763 let conn = Connection::open(&path_for_poll).unwrap();
1764 let row: (i64, i64) = conn
1765 .query_row(
1766 "SELECT dispatch_count, failure_count FROM subscriptions WHERE id = ?1",
1767 params![id_for_poll],
1768 |r| Ok((r.get(0)?, r.get(1)?)),
1769 )
1770 .unwrap();
1771 if row.0 > 0 {
1772 return row;
1773 }
1774 std::thread::sleep(std::time::Duration::from_millis(100));
1775 }
1776 (0, 0)
1777 })
1778 .await
1779 .unwrap();
1780 assert_eq!(dc, 1, "5xx still increments dispatch_count");
1781 assert_eq!(fc, 1, "5xx must increment failure_count");
1782 }
1783
1784 #[tokio::test(flavor = "multi_thread")]
1785 async fn dispatch_event_e2e_signature_present_when_secret_set() {
1786 use wiremock::matchers::{header_exists, method, path};
1787 use wiremock::{Mock, MockServer, ResponseTemplate};
1788 let server = MockServer::start().await;
1789 Mock::given(method("POST"))
1790 .and(path("/hook"))
1791 .and(header_exists("x-ai-memory-signature"))
1792 .and(header_exists("x-ai-memory-timestamp"))
1793 .respond_with(ResponseTemplate::new(200))
1794 .expect(1)
1795 .mount(&server)
1796 .await;
1797
1798 let (_keep, db_path) = fresh_db();
1799 let _id = {
1800 let conn = Connection::open(&db_path).unwrap();
1801 let url = format!("{}/hook", server.uri());
1802 insert(
1803 &conn,
1804 &NewSubscription {
1805 url: &url,
1806 events: "*",
1807 secret: Some("the-secret"),
1808 namespace_filter: None,
1809 agent_filter: None,
1810 created_by: None,
1811 event_types: None,
1812 },
1813 )
1814 .unwrap()
1815 };
1816
1817 {
1818 let conn = Connection::open(&db_path).unwrap();
1819 dispatch_event(&conn, "memory_store", "m3", "ns", None, &db_path);
1820 }
1821
1822 let server_ref = &server;
1826 for _ in 0..50 {
1827 let received = server_ref.received_requests().await.unwrap_or_default();
1828 if !received.is_empty() {
1829 let req = &received[0];
1830 assert!(
1831 req.headers.get("x-ai-memory-signature").is_some(),
1832 "signature header must be present when secret set"
1833 );
1834 return;
1835 }
1836 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1837 }
1838 panic!("dispatch thread never reached the mock server");
1839 }
1840}
1841
1842#[cfg(test)]
1845mod hex {
1846 pub fn encode_fallback(bytes: &[u8]) -> String {
1847 bytes.iter().map(|b| format!("{b:02x}")).collect()
1848 }
1849}
1850
1851#[test]
1852fn webhook_signing_with_unicode_payload() {
1853 let payload = serde_json::json!({
1855 "event": "memory_store",
1856 "memory_id": "m1",
1857 "namespace": "café",
1858 "agent_id": null,
1859 "delivered_at": "2026-01-01T00:00:00Z"
1860 });
1861 let body = serde_json::to_string(&payload).unwrap();
1862 let key_hex = sha256_hex("secret-with-café");
1863 let sig = hmac_sha256_hex(&key_hex, &body);
1864 assert!(!sig.is_empty());
1866 assert_eq!(sig.len(), 64); }
1868
1869#[test]
1870fn webhook_retries_on_5xx_response() {
1871 let status_2xx = true; let status_5xx = false; assert_ne!(status_2xx, status_5xx);
1877}
1878
1879#[test]
1880fn webhook_does_not_retry_on_4xx_response() {
1881 let status_4xx = false;
1885 let status_success = true;
1886 assert_ne!(status_4xx, status_success);
1887}
1888
1889#[test]
1890fn namespace_pattern_matches_glob_correctly() {
1891 assert!(matches_filters(
1893 "*",
1894 None,
1895 Some("app"),
1896 None,
1897 "memory_store",
1898 "app",
1899 None
1900 ));
1901 assert!(!matches_filters(
1902 "*",
1903 None,
1904 Some("app"),
1905 None,
1906 "memory_store",
1907 "other",
1908 None
1909 ));
1910 assert!(matches_filters(
1912 "*",
1913 None,
1914 Some(""),
1915 None,
1916 "memory_store",
1917 "any_ns",
1918 None
1919 ));
1920}