1use crate::errors::{AuthError, Result};
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::{SystemTime, UNIX_EPOCH};
17use tokio::sync::RwLock;
18use uuid::Uuid;
19
20pub mod event_types {
24 pub const SESSION_REVOKED: &str =
25 "https://schemas.openid.net/secevent/caep/event-type/session-revoked";
26 pub const TOKEN_CLAIMS_CHANGE: &str =
27 "https://schemas.openid.net/secevent/caep/event-type/token-claims-change";
28 pub const CREDENTIAL_CHANGE: &str =
29 "https://schemas.openid.net/secevent/caep/event-type/credential-change";
30 pub const ASSURANCE_LEVEL_CHANGE: &str =
31 "https://schemas.openid.net/secevent/caep/event-type/assurance-level-change";
32 pub const DEVICE_COMPLIANCE_CHANGE: &str =
33 "https://schemas.openid.net/secevent/caep/event-type/device-compliance-change";
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
40#[serde(tag = "format")]
41pub enum SubjectIdentifier {
42 #[serde(rename = "email")]
44 Email { email: String },
45 #[serde(rename = "iss_sub")]
47 IssSub { iss: String, sub: String },
48 #[serde(rename = "opaque")]
50 Opaque { id: String },
51 #[serde(rename = "phone_number")]
53 PhoneNumber { phone_number: String },
54 #[serde(rename = "session_id")]
56 SessionId {
57 session_id: String,
58 #[serde(default)]
59 iss: Option<String>,
60 },
61}
62
63#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
67#[serde(rename_all = "lowercase")]
68pub enum EventReasonAdmin {
69 Policy,
71 Admin,
73 User,
75}
76
77#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
79#[serde(rename_all = "snake_case")]
80pub enum ChangeType {
81 Create,
82 Revoke,
83 Update,
84 Delete,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct CaepEvent {
90 pub jti: String,
92 pub iss: String,
94 pub iat: u64,
96 pub event_type: String,
98 pub subject: SubjectIdentifier,
100 #[serde(default)]
102 pub initiating_entity: Option<EventReasonAdmin>,
103 #[serde(default)]
105 pub reason_admin: Option<String>,
106 #[serde(default)]
108 pub reason_user: Option<String>,
109 #[serde(default)]
111 pub properties: HashMap<String, serde_json::Value>,
112}
113
114impl CaepEvent {
115 pub fn new(iss: &str, event_type: &str, subject: SubjectIdentifier) -> Self {
117 let now = SystemTime::now()
118 .duration_since(UNIX_EPOCH)
119 .unwrap_or_default()
120 .as_secs();
121
122 Self {
123 jti: Uuid::new_v4().to_string(),
124 iss: iss.to_string(),
125 iat: now,
126 event_type: event_type.to_string(),
127 subject,
128 initiating_entity: None,
129 reason_admin: None,
130 reason_user: None,
131 properties: HashMap::new(),
132 }
133 }
134
135 pub fn with_initiating_entity(mut self, entity: EventReasonAdmin) -> Self {
137 self.initiating_entity = Some(entity);
138 self
139 }
140
141 pub fn with_reason_admin(mut self, reason: &str) -> Self {
143 self.reason_admin = Some(reason.to_string());
144 self
145 }
146
147 pub fn with_reason_user(mut self, reason: &str) -> Self {
149 self.reason_user = Some(reason.to_string());
150 self
151 }
152
153 pub fn with_property(mut self, key: &str, value: serde_json::Value) -> Self {
155 self.properties.insert(key.to_string(), value);
156 self
157 }
158
159 pub fn to_set_claims(&self) -> serde_json::Value {
161 let mut events: HashMap<String, serde_json::Value> = HashMap::new();
162 let mut event_body = serde_json::json!({
163 "subject": self.subject,
164 });
165 if let Some(ref entity) = self.initiating_entity {
166 event_body["initiating_entity"] = serde_json::to_value(entity).unwrap();
167 }
168 if let Some(ref r) = self.reason_admin {
169 event_body["reason_admin"] = serde_json::json!({"en": r});
170 }
171 if let Some(ref r) = self.reason_user {
172 event_body["reason_user"] = serde_json::json!({"en": r});
173 }
174 for (k, v) in &self.properties {
175 event_body[k] = v.clone();
176 }
177 events.insert(self.event_type.clone(), event_body);
178
179 serde_json::json!({
180 "jti": self.jti,
181 "iss": self.iss,
182 "iat": self.iat,
183 "events": events,
184 })
185 }
186}
187
188#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
192#[serde(rename_all = "lowercase")]
193pub enum DeliveryMethod {
194 Push,
196 Poll,
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct StreamConfig {
203 pub iss: String,
205 pub aud: Vec<String>,
207 pub events_supported: Vec<String>,
209 pub delivery_method: DeliveryMethod,
211 pub endpoint_url: String,
213}
214
215impl StreamConfig {
216 pub fn builder(iss: impl Into<String>, endpoint_url: impl Into<String>) -> StreamConfigBuilder {
218 StreamConfigBuilder {
219 iss: iss.into(),
220 aud: Vec::new(),
221 events_supported: Vec::new(),
222 delivery_method: DeliveryMethod::Push,
223 endpoint_url: endpoint_url.into(),
224 }
225 }
226
227 pub fn poll(iss: impl Into<String>, endpoint_url: impl Into<String>) -> StreamConfigBuilder {
229 Self::builder(iss, endpoint_url).delivery_method(DeliveryMethod::Poll)
230 }
231
232 pub fn push(iss: impl Into<String>, endpoint_url: impl Into<String>) -> StreamConfigBuilder {
234 Self::builder(iss, endpoint_url).delivery_method(DeliveryMethod::Push)
235 }
236}
237
238pub struct StreamConfigBuilder {
240 iss: String,
241 aud: Vec<String>,
242 events_supported: Vec<String>,
243 delivery_method: DeliveryMethod,
244 endpoint_url: String,
245}
246
247impl StreamConfigBuilder {
248 pub fn audience(mut self, audience: impl Into<String>) -> Self {
250 self.aud.push(audience.into());
251 self
252 }
253
254 pub fn audiences<I, S>(mut self, audiences: I) -> Self
256 where
257 I: IntoIterator<Item = S>,
258 S: Into<String>,
259 {
260 self.aud.extend(audiences.into_iter().map(Into::into));
261 self
262 }
263
264 pub fn supports_event(mut self, event_type: impl Into<String>) -> Self {
266 self.events_supported.push(event_type.into());
267 self
268 }
269
270 pub fn events_supported<I, S>(mut self, events: I) -> Self
272 where
273 I: IntoIterator<Item = S>,
274 S: Into<String>,
275 {
276 self.events_supported
277 .extend(events.into_iter().map(Into::into));
278 self
279 }
280
281 pub fn delivery_method(mut self, delivery_method: DeliveryMethod) -> Self {
283 self.delivery_method = delivery_method;
284 self
285 }
286
287 pub fn build(self) -> StreamConfig {
289 StreamConfig {
290 iss: self.iss,
291 aud: self.aud,
292 events_supported: self.events_supported,
293 delivery_method: self.delivery_method,
294 endpoint_url: self.endpoint_url,
295 }
296 }
297}
298
299#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
301#[serde(rename_all = "lowercase")]
302pub enum StreamStatus {
303 Enabled,
304 Paused,
305 Disabled,
306}
307
308struct EventStream {
312 config: StreamConfig,
313 status: StreamStatus,
314 events: Vec<CaepEvent>,
315}
316
317pub struct CaepTransmitter {
319 issuer: String,
320 streams: Arc<RwLock<HashMap<String, EventStream>>>,
321}
322
323impl CaepTransmitter {
324 pub fn new(issuer: &str) -> Self {
325 Self {
326 issuer: issuer.to_string(),
327 streams: Arc::new(RwLock::new(HashMap::new())),
328 }
329 }
330
331 pub async fn create_stream(&self, config: StreamConfig) -> String {
333 let stream_id = Uuid::new_v4().to_string();
334 self.streams.write().await.insert(
335 stream_id.clone(),
336 EventStream {
337 config,
338 status: StreamStatus::Enabled,
339 events: Vec::new(),
340 },
341 );
342 stream_id
343 }
344
345 pub async fn get_stream_config(&self, stream_id: &str) -> Option<StreamConfig> {
347 self.streams
348 .read()
349 .await
350 .get(stream_id)
351 .map(|s| s.config.clone())
352 }
353
354 pub async fn set_stream_status(&self, stream_id: &str, status: StreamStatus) -> Result<()> {
356 let mut streams = self.streams.write().await;
357 let stream = streams
358 .get_mut(stream_id)
359 .ok_or_else(|| AuthError::validation("Stream not found"))?;
360 stream.status = status;
361 Ok(())
362 }
363
364 pub async fn get_stream_status(&self, stream_id: &str) -> Option<StreamStatus> {
366 self.streams
367 .read()
368 .await
369 .get(stream_id)
370 .map(|s| s.status.clone())
371 }
372
373 pub async fn delete_stream(&self, stream_id: &str) -> bool {
375 self.streams.write().await.remove(stream_id).is_some()
376 }
377
378 pub async fn emit_session_revoked(
380 &self,
381 subject: SubjectIdentifier,
382 reason: Option<&str>,
383 ) -> Result<CaepEvent> {
384 let mut event = CaepEvent::new(&self.issuer, event_types::SESSION_REVOKED, subject);
385 if let Some(r) = reason {
386 event = event.with_reason_admin(r);
387 }
388 self.dispatch_event(&event).await;
389 Ok(event)
390 }
391
392 pub async fn emit_credential_change(
394 &self,
395 subject: SubjectIdentifier,
396 change_type: ChangeType,
397 ) -> Result<CaepEvent> {
398 let event = CaepEvent::new(&self.issuer, event_types::CREDENTIAL_CHANGE, subject)
399 .with_property("change_type", serde_json::to_value(&change_type).unwrap());
400 self.dispatch_event(&event).await;
401 Ok(event)
402 }
403
404 pub async fn emit_device_compliance_change(
406 &self,
407 subject: SubjectIdentifier,
408 previous_status: &str,
409 current_status: &str,
410 ) -> Result<CaepEvent> {
411 let event = CaepEvent::new(&self.issuer, event_types::DEVICE_COMPLIANCE_CHANGE, subject)
412 .with_property("previous_status", serde_json::json!(previous_status))
413 .with_property("current_status", serde_json::json!(current_status));
414 self.dispatch_event(&event).await;
415 Ok(event)
416 }
417
418 pub async fn emit_token_claims_change(
422 &self,
423 subject: SubjectIdentifier,
424 claims: HashMap<String, serde_json::Value>,
425 ) -> Result<CaepEvent> {
426 let mut event = CaepEvent::new(&self.issuer, event_types::TOKEN_CLAIMS_CHANGE, subject);
427 event = event.with_property("claims", serde_json::to_value(&claims).unwrap());
428 self.dispatch_event(&event).await;
429 Ok(event)
430 }
431
432 pub async fn emit_assurance_level_change(
436 &self,
437 subject: SubjectIdentifier,
438 current_level: &str,
439 previous_level: &str,
440 change_direction: &str,
441 ) -> Result<CaepEvent> {
442 let event = CaepEvent::new(&self.issuer, event_types::ASSURANCE_LEVEL_CHANGE, subject)
443 .with_property("current_level", serde_json::json!(current_level))
444 .with_property("previous_level", serde_json::json!(previous_level))
445 .with_property("change_direction", serde_json::json!(change_direction));
446 self.dispatch_event(&event).await;
447 Ok(event)
448 }
449
450 pub async fn poll_events(&self, stream_id: &str) -> Result<Vec<CaepEvent>> {
452 let mut streams = self.streams.write().await;
453 let stream = streams
454 .get_mut(stream_id)
455 .ok_or_else(|| AuthError::validation("Stream not found"))?;
456
457 if stream.status != StreamStatus::Enabled {
458 return Err(AuthError::validation("Stream is not enabled"));
459 }
460
461 let events = std::mem::take(&mut stream.events);
462 Ok(events)
463 }
464
465 async fn dispatch_event(&self, event: &CaepEvent) {
467 let mut streams = self.streams.write().await;
468 for stream in streams.values_mut() {
469 if stream.status != StreamStatus::Enabled {
470 continue;
471 }
472 if stream.config.events_supported.is_empty()
474 || stream
475 .config
476 .events_supported
477 .iter()
478 .any(|e| e == &event.event_type)
479 {
480 stream.events.push(event.clone());
481 }
482 }
483 }
484
485 pub async fn active_stream_count(&self) -> usize {
487 self.streams
488 .read()
489 .await
490 .values()
491 .filter(|s| s.status == StreamStatus::Enabled)
492 .count()
493 }
494}
495
496pub type EventHandler = Arc<dyn Fn(&CaepEvent) + Send + Sync>;
500
501pub struct CaepReceiver {
503 handlers: Arc<RwLock<HashMap<String, Vec<EventHandler>>>>,
504 received_jtis: Arc<RwLock<std::collections::HashSet<String>>>,
505}
506
507impl CaepReceiver {
508 pub fn new() -> Self {
509 Self {
510 handlers: Arc::new(RwLock::new(HashMap::new())),
511 received_jtis: Arc::new(RwLock::new(std::collections::HashSet::new())),
512 }
513 }
514
515 pub async fn on_event(&self, event_type: &str, handler: EventHandler) {
517 self.handlers
518 .write()
519 .await
520 .entry(event_type.to_string())
521 .or_default()
522 .push(handler);
523 }
524
525 pub async fn process_event(&self, event: &CaepEvent) -> Result<bool> {
529 {
531 let mut jtis = self.received_jtis.write().await;
532 if !jtis.insert(event.jti.clone()) {
533 return Ok(false); }
535 }
536
537 let handlers = self.handlers.read().await;
539 if let Some(handler_list) = handlers.get(&event.event_type) {
540 for handler in handler_list {
541 handler(event);
542 }
543 }
544
545 Ok(true)
546 }
547
548 pub async fn was_processed(&self, jti: &str) -> bool {
550 self.received_jtis.read().await.contains(jti)
551 }
552}
553
554impl Default for CaepReceiver {
555 fn default() -> Self {
556 Self::new()
557 }
558}
559
560#[cfg(test)]
561mod tests {
562 use super::*;
563
564 #[test]
567 fn test_subject_email_serialization() {
568 let sub = SubjectIdentifier::Email {
569 email: "user@example.com".to_string(),
570 };
571 let json = serde_json::to_value(&sub).unwrap();
572 assert_eq!(json["format"], "email");
573 assert_eq!(json["email"], "user@example.com");
574 }
575
576 #[test]
577 fn test_subject_iss_sub_serialization() {
578 let sub = SubjectIdentifier::IssSub {
579 iss: "https://issuer.example".to_string(),
580 sub: "user-123".to_string(),
581 };
582 let json = serde_json::to_value(&sub).unwrap();
583 assert_eq!(json["format"], "iss_sub");
584 }
585
586 #[test]
587 fn test_subject_session_id_serialization() {
588 let sub = SubjectIdentifier::SessionId {
589 session_id: "sess-001".to_string(),
590 iss: Some("https://issuer.example".to_string()),
591 };
592 let json = serde_json::to_value(&sub).unwrap();
593 assert_eq!(json["format"], "session_id");
594 assert_eq!(json["session_id"], "sess-001");
595 }
596
597 #[test]
598 fn test_subject_roundtrip() {
599 let sub = SubjectIdentifier::Opaque {
600 id: "opaque-id-42".to_string(),
601 };
602 let serialized = serde_json::to_string(&sub).unwrap();
603 let deserialized: SubjectIdentifier = serde_json::from_str(&serialized).unwrap();
604 assert_eq!(sub, deserialized);
605 }
606
607 #[test]
610 fn test_event_creation() {
611 let event = CaepEvent::new(
612 "https://issuer.example",
613 event_types::SESSION_REVOKED,
614 SubjectIdentifier::Email {
615 email: "user@x.com".to_string(),
616 },
617 );
618 assert!(!event.jti.is_empty());
619 assert!(event.iat > 0);
620 assert_eq!(event.event_type, event_types::SESSION_REVOKED);
621 }
622
623 #[test]
624 fn test_event_builder_chain() {
625 let event = CaepEvent::new(
626 "https://issuer.example",
627 event_types::CREDENTIAL_CHANGE,
628 SubjectIdentifier::Email {
629 email: "u@x.com".to_string(),
630 },
631 )
632 .with_initiating_entity(EventReasonAdmin::Admin)
633 .with_reason_admin("Password reset")
634 .with_reason_user("Your password was reset by admin")
635 .with_property("change_type", serde_json::json!("revoke"));
636
637 assert_eq!(event.initiating_entity, Some(EventReasonAdmin::Admin));
638 assert_eq!(event.reason_admin.as_deref(), Some("Password reset"));
639 assert!(event.properties.contains_key("change_type"));
640 }
641
642 #[test]
643 fn test_event_to_set_claims() {
644 let event = CaepEvent::new(
645 "https://iss.example",
646 event_types::SESSION_REVOKED,
647 SubjectIdentifier::Email {
648 email: "u@x.com".to_string(),
649 },
650 )
651 .with_reason_admin("Policy violation");
652
653 let claims = event.to_set_claims();
654 assert_eq!(claims["iss"], "https://iss.example");
655 assert!(claims["events"][event_types::SESSION_REVOKED].is_object());
656 let ev = &claims["events"][event_types::SESSION_REVOKED];
657 assert!(ev["subject"].is_object());
658 assert_eq!(ev["reason_admin"]["en"], "Policy violation");
659 }
660
661 #[tokio::test]
664 async fn test_transmitter_create_stream() {
665 let tx = CaepTransmitter::new("https://issuer.example");
666 let config =
667 StreamConfig::poll("https://issuer.example", "https://receiver.example/events")
668 .audience("https://receiver.example")
669 .supports_event(event_types::SESSION_REVOKED)
670 .build();
671 let stream_id = tx.create_stream(config.clone()).await;
672 assert!(!stream_id.is_empty());
673 assert_eq!(tx.active_stream_count().await, 1);
674
675 let retrieved = tx.get_stream_config(&stream_id).await.unwrap();
676 assert_eq!(retrieved.iss, config.iss);
677 }
678
679 #[tokio::test]
680 async fn test_transmitter_stream_status() {
681 let tx = CaepTransmitter::new("https://iss.example");
682 let id = tx
683 .create_stream(StreamConfig::poll("https://iss.example", "").build())
684 .await;
685
686 assert_eq!(tx.get_stream_status(&id).await, Some(StreamStatus::Enabled));
687
688 tx.set_stream_status(&id, StreamStatus::Paused)
689 .await
690 .unwrap();
691 assert_eq!(tx.get_stream_status(&id).await, Some(StreamStatus::Paused));
692 assert_eq!(tx.active_stream_count().await, 0);
693 }
694
695 #[tokio::test]
696 async fn test_transmitter_delete_stream() {
697 let tx = CaepTransmitter::new("https://iss.example");
698 let id = tx.create_stream(StreamConfig::push("", "").build()).await;
699
700 assert!(tx.delete_stream(&id).await);
701 assert!(!tx.delete_stream(&id).await); assert_eq!(tx.active_stream_count().await, 0);
703 }
704
705 #[tokio::test]
706 async fn test_transmitter_emit_session_revoked() {
707 let tx = CaepTransmitter::new("https://iss.example");
708 let id = tx
709 .create_stream(
710 StreamConfig::poll("", "")
711 .supports_event(event_types::SESSION_REVOKED)
712 .build(),
713 )
714 .await;
715
716 let event = tx
717 .emit_session_revoked(
718 SubjectIdentifier::Email {
719 email: "u@x.com".to_string(),
720 },
721 Some("Security policy"),
722 )
723 .await
724 .unwrap();
725
726 assert_eq!(event.event_type, event_types::SESSION_REVOKED);
727
728 let events = tx.poll_events(&id).await.unwrap();
729 assert_eq!(events.len(), 1);
730 assert_eq!(events[0].jti, event.jti);
731
732 let events2 = tx.poll_events(&id).await.unwrap();
734 assert!(events2.is_empty());
735 }
736
737 #[tokio::test]
738 async fn test_transmitter_event_filtering() {
739 let tx = CaepTransmitter::new("https://iss.example");
740
741 let id = tx
743 .create_stream(
744 StreamConfig::poll("", "")
745 .supports_event(event_types::SESSION_REVOKED)
746 .build(),
747 )
748 .await;
749
750 tx.emit_credential_change(
752 SubjectIdentifier::Email {
753 email: "u@x.com".to_string(),
754 },
755 ChangeType::Revoke,
756 )
757 .await
758 .unwrap();
759
760 let events = tx.poll_events(&id).await.unwrap();
761 assert!(events.is_empty());
762 }
763
764 #[tokio::test]
765 async fn test_transmitter_paused_stream_no_events() {
766 let tx = CaepTransmitter::new("https://iss.example");
767 let id = tx.create_stream(StreamConfig::poll("", "").build()).await;
768
769 tx.set_stream_status(&id, StreamStatus::Paused)
770 .await
771 .unwrap();
772
773 tx.emit_session_revoked(
774 SubjectIdentifier::Email {
775 email: "u@x.com".to_string(),
776 },
777 None,
778 )
779 .await
780 .unwrap();
781
782 assert!(tx.poll_events(&id).await.is_err());
784 }
785
786 #[tokio::test]
789 async fn test_receiver_process_event() {
790 let rx = CaepReceiver::new();
791 let called = Arc::new(std::sync::atomic::AtomicBool::new(false));
792 let called_clone = called.clone();
793
794 rx.on_event(
795 event_types::SESSION_REVOKED,
796 Arc::new(move |_event| {
797 called_clone.store(true, std::sync::atomic::Ordering::SeqCst);
798 }),
799 )
800 .await;
801
802 let event = CaepEvent::new(
803 "https://iss.example",
804 event_types::SESSION_REVOKED,
805 SubjectIdentifier::Email {
806 email: "u@x.com".to_string(),
807 },
808 );
809
810 let processed = rx.process_event(&event).await.unwrap();
811 assert!(processed);
812 assert!(called.load(std::sync::atomic::Ordering::SeqCst));
813 }
814
815 #[test]
816 fn test_stream_config_builder() {
817 let config =
818 StreamConfig::poll("https://issuer.example", "https://receiver.example/events")
819 .audience("https://receiver.example")
820 .audiences(["https://backup.example"])
821 .supports_event(event_types::SESSION_REVOKED)
822 .events_supported([event_types::CREDENTIAL_CHANGE])
823 .build();
824
825 assert_eq!(config.delivery_method, DeliveryMethod::Poll);
826 assert_eq!(config.aud.len(), 2);
827 assert_eq!(config.events_supported.len(), 2);
828 }
829
830 #[tokio::test]
831 async fn test_receiver_deduplication() {
832 let rx = CaepReceiver::new();
833 let event = CaepEvent::new(
834 "https://iss.example",
835 event_types::CREDENTIAL_CHANGE,
836 SubjectIdentifier::Opaque {
837 id: "x".to_string(),
838 },
839 );
840
841 let first = rx.process_event(&event).await.unwrap();
842 assert!(first);
843
844 let second = rx.process_event(&event).await.unwrap();
845 assert!(!second); assert!(rx.was_processed(&event.jti).await);
848 }
849
850 #[tokio::test]
851 async fn test_receiver_unhandled_event_type() {
852 let rx = CaepReceiver::new();
853 let event = CaepEvent::new(
854 "https://iss.example",
855 "custom:unknown-event",
856 SubjectIdentifier::Opaque {
857 id: "x".to_string(),
858 },
859 );
860 let processed = rx.process_event(&event).await.unwrap();
862 assert!(processed);
863 }
864
865 #[test]
868 fn test_delivery_method_serialization() {
869 assert_eq!(
870 serde_json::to_string(&DeliveryMethod::Push).unwrap(),
871 r#""push""#
872 );
873 assert_eq!(
874 serde_json::to_string(&DeliveryMethod::Poll).unwrap(),
875 r#""poll""#
876 );
877 }
878}