Skip to main content

auth_framework/protocols/
caep.rs

1//! CAEP (Continuous Access Evaluation Protocol) implementation.
2//!
3//! Implements the Shared Signals and Events (SSE) Framework with CAEP event types
4//! for continuous access evaluation, session revocation, and compliance signalling.
5//!
6//! # References
7//!
8//! - [CAEP spec](https://openid.net/specs/openid-caep-specification-1_0.html)
9//! - [SSE Framework](https://openid.net/specs/openid-sse-framework-1_0.html)
10//! - [SET (Security Event Token) RFC 8417](https://www.rfc-editor.org/rfc/rfc8417)
11
12use crate::errors::{AuthError, Result};
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::{SystemTime, UNIX_EPOCH};
17use tokio::sync::RwLock;
18use uuid::Uuid;
19
20// ── Event types (CAEP §3) ───────────────────────────────────────────
21
22/// Well-known CAEP event type URIs.
23pub mod event_types {
24    pub const SESSION_REVOKED: &str =
25        "https://schemas.openid.net/secevent/caep/event-type/session-revoked";
26    pub const TOKEN_CLAIMS_CHANGE: &str =
27        "https://schemas.openid.net/secevent/caep/event-type/token-claims-change";
28    pub const CREDENTIAL_CHANGE: &str =
29        "https://schemas.openid.net/secevent/caep/event-type/credential-change";
30    pub const ASSURANCE_LEVEL_CHANGE: &str =
31        "https://schemas.openid.net/secevent/caep/event-type/assurance-level-change";
32    pub const DEVICE_COMPLIANCE_CHANGE: &str =
33        "https://schemas.openid.net/secevent/caep/event-type/device-compliance-change";
34}
35
36// ── Subject identifiers (SSE §3) ───────────────────────────────────
37
38/// Subject identifier formats per the SSE Framework.
39#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
40#[serde(tag = "format")]
41pub enum SubjectIdentifier {
42    /// Email-based subject.
43    #[serde(rename = "email")]
44    Email { email: String },
45    /// Issuer + subject pair.
46    #[serde(rename = "iss_sub")]
47    IssSub { iss: String, sub: String },
48    /// Opaque identifier.
49    #[serde(rename = "opaque")]
50    Opaque { id: String },
51    /// Phone number (+E.164).
52    #[serde(rename = "phone_number")]
53    PhoneNumber { phone_number: String },
54    /// Session ID.
55    #[serde(rename = "session_id")]
56    SessionId {
57        session_id: String,
58        #[serde(default)]
59        iss: Option<String>,
60    },
61}
62
63// ── CAEP Event ──────────────────────────────────────────────────────
64
65/// The reason/initiating entity for a CAEP event.
66#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
67#[serde(rename_all = "lowercase")]
68pub enum EventReasonAdmin {
69    /// Policy-driven.
70    Policy,
71    /// Admin-initiated.
72    Admin,
73    /// User-initiated.
74    User,
75}
76
77/// Change type for credential/compliance events.
78#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
79#[serde(rename_all = "snake_case")]
80pub enum ChangeType {
81    Create,
82    Revoke,
83    Update,
84    Delete,
85}
86
87/// A CAEP event (carried as a Security Event Token — SET).
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct CaepEvent {
90    /// Unique event ID (jti).
91    pub jti: String,
92    /// Issuer.
93    pub iss: String,
94    /// Issued-at (Unix timestamp).
95    pub iat: u64,
96    /// Event type URI.
97    pub event_type: String,
98    /// Subject identifier.
99    pub subject: SubjectIdentifier,
100    /// Initiating entity.
101    #[serde(default)]
102    pub initiating_entity: Option<EventReasonAdmin>,
103    /// Reason string for the event.
104    #[serde(default)]
105    pub reason_admin: Option<String>,
106    /// Reason string shown to the user.
107    #[serde(default)]
108    pub reason_user: Option<String>,
109    /// Additional event-specific claims.
110    #[serde(default)]
111    pub properties: HashMap<String, serde_json::Value>,
112}
113
114impl CaepEvent {
115    /// Build a new CAEP event with generated ID and timestamp.
116    pub fn new(iss: &str, event_type: &str, subject: SubjectIdentifier) -> Self {
117        let now = SystemTime::now()
118            .duration_since(UNIX_EPOCH)
119            .unwrap_or_default()
120            .as_secs();
121
122        Self {
123            jti: Uuid::new_v4().to_string(),
124            iss: iss.to_string(),
125            iat: now,
126            event_type: event_type.to_string(),
127            subject,
128            initiating_entity: None,
129            reason_admin: None,
130            reason_user: None,
131            properties: HashMap::new(),
132        }
133    }
134
135    /// Set the initiating entity.
136    pub fn with_initiating_entity(mut self, entity: EventReasonAdmin) -> Self {
137        self.initiating_entity = Some(entity);
138        self
139    }
140
141    /// Set the admin reason.
142    pub fn with_reason_admin(mut self, reason: &str) -> Self {
143        self.reason_admin = Some(reason.to_string());
144        self
145    }
146
147    /// Set the user-facing reason.
148    pub fn with_reason_user(mut self, reason: &str) -> Self {
149        self.reason_user = Some(reason.to_string());
150        self
151    }
152
153    /// Add an event-specific property.
154    pub fn with_property(mut self, key: &str, value: serde_json::Value) -> Self {
155        self.properties.insert(key.to_string(), value);
156        self
157    }
158
159    /// Encode the event as a SET (Security Event Token) claims payload.
160    pub fn to_set_claims(&self) -> serde_json::Value {
161        let mut events: HashMap<String, serde_json::Value> = HashMap::new();
162        let mut event_body = serde_json::json!({
163            "subject": self.subject,
164        });
165        if let Some(ref entity) = self.initiating_entity {
166            event_body["initiating_entity"] = serde_json::to_value(entity).unwrap();
167        }
168        if let Some(ref r) = self.reason_admin {
169            event_body["reason_admin"] = serde_json::json!({"en": r});
170        }
171        if let Some(ref r) = self.reason_user {
172            event_body["reason_user"] = serde_json::json!({"en": r});
173        }
174        for (k, v) in &self.properties {
175            event_body[k] = v.clone();
176        }
177        events.insert(self.event_type.clone(), event_body);
178
179        serde_json::json!({
180            "jti": self.jti,
181            "iss": self.iss,
182            "iat": self.iat,
183            "events": events,
184        })
185    }
186}
187
188// ── Stream configuration (SSE §6) ──────────────────────────────────
189
190/// SSE stream delivery method.
191#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
192#[serde(rename_all = "lowercase")]
193pub enum DeliveryMethod {
194    /// Push via HTTP POST.
195    Push,
196    /// Poll (receiver calls GET).
197    Poll,
198}
199
200/// SSE stream configuration.
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct StreamConfig {
203    /// Issuer URL.
204    pub iss: String,
205    /// Audience for events on this stream.
206    pub aud: Vec<String>,
207    /// Supported event types.
208    pub events_supported: Vec<String>,
209    /// Delivery method.
210    pub delivery_method: DeliveryMethod,
211    /// Endpoint URL (push: receiver's endpoint; poll: transmitter's endpoint).
212    pub endpoint_url: String,
213}
214
215impl StreamConfig {
216    /// Create a builder for an event stream configuration.
217    pub fn builder(iss: impl Into<String>, endpoint_url: impl Into<String>) -> StreamConfigBuilder {
218        StreamConfigBuilder {
219            iss: iss.into(),
220            aud: Vec::new(),
221            events_supported: Vec::new(),
222            delivery_method: DeliveryMethod::Push,
223            endpoint_url: endpoint_url.into(),
224        }
225    }
226
227    /// Create a poll-based stream configuration builder.
228    pub fn poll(iss: impl Into<String>, endpoint_url: impl Into<String>) -> StreamConfigBuilder {
229        Self::builder(iss, endpoint_url).delivery_method(DeliveryMethod::Poll)
230    }
231
232    /// Create a push-based stream configuration builder.
233    pub fn push(iss: impl Into<String>, endpoint_url: impl Into<String>) -> StreamConfigBuilder {
234        Self::builder(iss, endpoint_url).delivery_method(DeliveryMethod::Push)
235    }
236}
237
238/// Builder for CAEP stream configuration.
239pub struct StreamConfigBuilder {
240    iss: String,
241    aud: Vec<String>,
242    events_supported: Vec<String>,
243    delivery_method: DeliveryMethod,
244    endpoint_url: String,
245}
246
247impl StreamConfigBuilder {
248    /// Add a single audience to the stream.
249    pub fn audience(mut self, audience: impl Into<String>) -> Self {
250        self.aud.push(audience.into());
251        self
252    }
253
254    /// Add multiple audiences to the stream.
255    pub fn audiences<I, S>(mut self, audiences: I) -> Self
256    where
257        I: IntoIterator<Item = S>,
258        S: Into<String>,
259    {
260        self.aud.extend(audiences.into_iter().map(Into::into));
261        self
262    }
263
264    /// Add a supported event type to the stream.
265    pub fn supports_event(mut self, event_type: impl Into<String>) -> Self {
266        self.events_supported.push(event_type.into());
267        self
268    }
269
270    /// Add multiple supported event types.
271    pub fn events_supported<I, S>(mut self, events: I) -> Self
272    where
273        I: IntoIterator<Item = S>,
274        S: Into<String>,
275    {
276        self.events_supported
277            .extend(events.into_iter().map(Into::into));
278        self
279    }
280
281    /// Set the delivery method.
282    pub fn delivery_method(mut self, delivery_method: DeliveryMethod) -> Self {
283        self.delivery_method = delivery_method;
284        self
285    }
286
287    /// Build the stream configuration.
288    pub fn build(self) -> StreamConfig {
289        StreamConfig {
290            iss: self.iss,
291            aud: self.aud,
292            events_supported: self.events_supported,
293            delivery_method: self.delivery_method,
294            endpoint_url: self.endpoint_url,
295        }
296    }
297}
298
299/// Stream status.
300#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
301#[serde(rename_all = "lowercase")]
302pub enum StreamStatus {
303    Enabled,
304    Paused,
305    Disabled,
306}
307
308// ── Event transmitter ───────────────────────────────────────────────
309
310/// A registered event stream with its buffered events.
311struct EventStream {
312    config: StreamConfig,
313    status: StreamStatus,
314    events: Vec<CaepEvent>,
315}
316
317/// CAEP event transmitter — manages streams and dispatches events.
318pub struct CaepTransmitter {
319    issuer: String,
320    streams: Arc<RwLock<HashMap<String, EventStream>>>,
321}
322
323impl CaepTransmitter {
324    pub fn new(issuer: &str) -> Self {
325        Self {
326            issuer: issuer.to_string(),
327            streams: Arc::new(RwLock::new(HashMap::new())),
328        }
329    }
330
331    /// Register a new event stream. Returns the stream ID.
332    pub async fn create_stream(&self, config: StreamConfig) -> String {
333        let stream_id = Uuid::new_v4().to_string();
334        self.streams.write().await.insert(
335            stream_id.clone(),
336            EventStream {
337                config,
338                status: StreamStatus::Enabled,
339                events: Vec::new(),
340            },
341        );
342        stream_id
343    }
344
345    /// Get stream configuration.
346    pub async fn get_stream_config(&self, stream_id: &str) -> Option<StreamConfig> {
347        self.streams
348            .read()
349            .await
350            .get(stream_id)
351            .map(|s| s.config.clone())
352    }
353
354    /// Update stream status.
355    pub async fn set_stream_status(&self, stream_id: &str, status: StreamStatus) -> Result<()> {
356        let mut streams = self.streams.write().await;
357        let stream = streams
358            .get_mut(stream_id)
359            .ok_or_else(|| AuthError::validation("Stream not found"))?;
360        stream.status = status;
361        Ok(())
362    }
363
364    /// Get stream status.
365    pub async fn get_stream_status(&self, stream_id: &str) -> Option<StreamStatus> {
366        self.streams
367            .read()
368            .await
369            .get(stream_id)
370            .map(|s| s.status.clone())
371    }
372
373    /// Delete a stream.
374    pub async fn delete_stream(&self, stream_id: &str) -> bool {
375        self.streams.write().await.remove(stream_id).is_some()
376    }
377
378    /// Emit a session-revoked event to all enabled streams.
379    pub async fn emit_session_revoked(
380        &self,
381        subject: SubjectIdentifier,
382        reason: Option<&str>,
383    ) -> Result<CaepEvent> {
384        let mut event = CaepEvent::new(&self.issuer, event_types::SESSION_REVOKED, subject);
385        if let Some(r) = reason {
386            event = event.with_reason_admin(r);
387        }
388        self.dispatch_event(&event).await;
389        Ok(event)
390    }
391
392    /// Emit a credential-change event.
393    pub async fn emit_credential_change(
394        &self,
395        subject: SubjectIdentifier,
396        change_type: ChangeType,
397    ) -> Result<CaepEvent> {
398        let event = CaepEvent::new(&self.issuer, event_types::CREDENTIAL_CHANGE, subject)
399            .with_property("change_type", serde_json::to_value(&change_type).unwrap());
400        self.dispatch_event(&event).await;
401        Ok(event)
402    }
403
404    /// Emit a device-compliance-change event.
405    pub async fn emit_device_compliance_change(
406        &self,
407        subject: SubjectIdentifier,
408        previous_status: &str,
409        current_status: &str,
410    ) -> Result<CaepEvent> {
411        let event = CaepEvent::new(&self.issuer, event_types::DEVICE_COMPLIANCE_CHANGE, subject)
412            .with_property("previous_status", serde_json::json!(previous_status))
413            .with_property("current_status", serde_json::json!(current_status));
414        self.dispatch_event(&event).await;
415        Ok(event)
416    }
417
418    /// Emit a token-claims-change event.
419    ///
420    /// Signals that one or more claims in a previously issued token have changed.
421    pub async fn emit_token_claims_change(
422        &self,
423        subject: SubjectIdentifier,
424        claims: HashMap<String, serde_json::Value>,
425    ) -> Result<CaepEvent> {
426        let mut event = CaepEvent::new(&self.issuer, event_types::TOKEN_CLAIMS_CHANGE, subject);
427        event = event.with_property("claims", serde_json::to_value(&claims).unwrap());
428        self.dispatch_event(&event).await;
429        Ok(event)
430    }
431
432    /// Emit an assurance-level-change event.
433    ///
434    /// Signals that the authentication assurance level for a subject has changed.
435    pub async fn emit_assurance_level_change(
436        &self,
437        subject: SubjectIdentifier,
438        current_level: &str,
439        previous_level: &str,
440        change_direction: &str,
441    ) -> Result<CaepEvent> {
442        let event = CaepEvent::new(&self.issuer, event_types::ASSURANCE_LEVEL_CHANGE, subject)
443            .with_property("current_level", serde_json::json!(current_level))
444            .with_property("previous_level", serde_json::json!(previous_level))
445            .with_property("change_direction", serde_json::json!(change_direction));
446        self.dispatch_event(&event).await;
447        Ok(event)
448    }
449
450    /// Poll events for a stream (for pull-based delivery).
451    pub async fn poll_events(&self, stream_id: &str) -> Result<Vec<CaepEvent>> {
452        let mut streams = self.streams.write().await;
453        let stream = streams
454            .get_mut(stream_id)
455            .ok_or_else(|| AuthError::validation("Stream not found"))?;
456
457        if stream.status != StreamStatus::Enabled {
458            return Err(AuthError::validation("Stream is not enabled"));
459        }
460
461        let events = std::mem::take(&mut stream.events);
462        Ok(events)
463    }
464
465    /// Dispatch an event to all enabled streams.
466    async fn dispatch_event(&self, event: &CaepEvent) {
467        let mut streams = self.streams.write().await;
468        for stream in streams.values_mut() {
469            if stream.status != StreamStatus::Enabled {
470                continue;
471            }
472            // Check if this stream accepts the event type
473            if stream.config.events_supported.is_empty()
474                || stream
475                    .config
476                    .events_supported
477                    .iter()
478                    .any(|e| e == &event.event_type)
479            {
480                stream.events.push(event.clone());
481            }
482        }
483    }
484
485    /// Count active (enabled) streams.
486    pub async fn active_stream_count(&self) -> usize {
487        self.streams
488            .read()
489            .await
490            .values()
491            .filter(|s| s.status == StreamStatus::Enabled)
492            .count()
493    }
494}
495
496// ── Event receiver ──────────────────────────────────────────────────
497
498/// Callback type for handling received CAEP events.
499pub type EventHandler = Arc<dyn Fn(&CaepEvent) + Send + Sync>;
500
501/// CAEP event receiver — processes incoming events.
502pub struct CaepReceiver {
503    handlers: Arc<RwLock<HashMap<String, Vec<EventHandler>>>>,
504    received_jtis: Arc<RwLock<std::collections::HashSet<String>>>,
505}
506
507impl CaepReceiver {
508    pub fn new() -> Self {
509        Self {
510            handlers: Arc::new(RwLock::new(HashMap::new())),
511            received_jtis: Arc::new(RwLock::new(std::collections::HashSet::new())),
512        }
513    }
514
515    /// Register a handler for a specific event type.
516    pub async fn on_event(&self, event_type: &str, handler: EventHandler) {
517        self.handlers
518            .write()
519            .await
520            .entry(event_type.to_string())
521            .or_default()
522            .push(handler);
523    }
524
525    /// Process a received CAEP event.
526    ///
527    /// Deduplicates by jti and invokes registered handlers.
528    pub async fn process_event(&self, event: &CaepEvent) -> Result<bool> {
529        // Deduplicate
530        {
531            let mut jtis = self.received_jtis.write().await;
532            if !jtis.insert(event.jti.clone()) {
533                return Ok(false); // Already processed
534            }
535        }
536
537        // Invoke handlers
538        let handlers = self.handlers.read().await;
539        if let Some(handler_list) = handlers.get(&event.event_type) {
540            for handler in handler_list {
541                handler(event);
542            }
543        }
544
545        Ok(true)
546    }
547
548    /// Check if an event was already processed.
549    pub async fn was_processed(&self, jti: &str) -> bool {
550        self.received_jtis.read().await.contains(jti)
551    }
552}
553
554impl Default for CaepReceiver {
555    fn default() -> Self {
556        Self::new()
557    }
558}
559
560#[cfg(test)]
561mod tests {
562    use super::*;
563
564    // ── Subject identifiers ─────────────────────────────────────
565
566    #[test]
567    fn test_subject_email_serialization() {
568        let sub = SubjectIdentifier::Email {
569            email: "user@example.com".to_string(),
570        };
571        let json = serde_json::to_value(&sub).unwrap();
572        assert_eq!(json["format"], "email");
573        assert_eq!(json["email"], "user@example.com");
574    }
575
576    #[test]
577    fn test_subject_iss_sub_serialization() {
578        let sub = SubjectIdentifier::IssSub {
579            iss: "https://issuer.example".to_string(),
580            sub: "user-123".to_string(),
581        };
582        let json = serde_json::to_value(&sub).unwrap();
583        assert_eq!(json["format"], "iss_sub");
584    }
585
586    #[test]
587    fn test_subject_session_id_serialization() {
588        let sub = SubjectIdentifier::SessionId {
589            session_id: "sess-001".to_string(),
590            iss: Some("https://issuer.example".to_string()),
591        };
592        let json = serde_json::to_value(&sub).unwrap();
593        assert_eq!(json["format"], "session_id");
594        assert_eq!(json["session_id"], "sess-001");
595    }
596
597    #[test]
598    fn test_subject_roundtrip() {
599        let sub = SubjectIdentifier::Opaque {
600            id: "opaque-id-42".to_string(),
601        };
602        let serialized = serde_json::to_string(&sub).unwrap();
603        let deserialized: SubjectIdentifier = serde_json::from_str(&serialized).unwrap();
604        assert_eq!(sub, deserialized);
605    }
606
607    // ── CaepEvent construction ──────────────────────────────────
608
609    #[test]
610    fn test_event_creation() {
611        let event = CaepEvent::new(
612            "https://issuer.example",
613            event_types::SESSION_REVOKED,
614            SubjectIdentifier::Email {
615                email: "user@x.com".to_string(),
616            },
617        );
618        assert!(!event.jti.is_empty());
619        assert!(event.iat > 0);
620        assert_eq!(event.event_type, event_types::SESSION_REVOKED);
621    }
622
623    #[test]
624    fn test_event_builder_chain() {
625        let event = CaepEvent::new(
626            "https://issuer.example",
627            event_types::CREDENTIAL_CHANGE,
628            SubjectIdentifier::Email {
629                email: "u@x.com".to_string(),
630            },
631        )
632        .with_initiating_entity(EventReasonAdmin::Admin)
633        .with_reason_admin("Password reset")
634        .with_reason_user("Your password was reset by admin")
635        .with_property("change_type", serde_json::json!("revoke"));
636
637        assert_eq!(event.initiating_entity, Some(EventReasonAdmin::Admin));
638        assert_eq!(event.reason_admin.as_deref(), Some("Password reset"));
639        assert!(event.properties.contains_key("change_type"));
640    }
641
642    #[test]
643    fn test_event_to_set_claims() {
644        let event = CaepEvent::new(
645            "https://iss.example",
646            event_types::SESSION_REVOKED,
647            SubjectIdentifier::Email {
648                email: "u@x.com".to_string(),
649            },
650        )
651        .with_reason_admin("Policy violation");
652
653        let claims = event.to_set_claims();
654        assert_eq!(claims["iss"], "https://iss.example");
655        assert!(claims["events"][event_types::SESSION_REVOKED].is_object());
656        let ev = &claims["events"][event_types::SESSION_REVOKED];
657        assert!(ev["subject"].is_object());
658        assert_eq!(ev["reason_admin"]["en"], "Policy violation");
659    }
660
661    // ── Transmitter ─────────────────────────────────────────────
662
663    #[tokio::test]
664    async fn test_transmitter_create_stream() {
665        let tx = CaepTransmitter::new("https://issuer.example");
666        let config =
667            StreamConfig::poll("https://issuer.example", "https://receiver.example/events")
668                .audience("https://receiver.example")
669                .supports_event(event_types::SESSION_REVOKED)
670                .build();
671        let stream_id = tx.create_stream(config.clone()).await;
672        assert!(!stream_id.is_empty());
673        assert_eq!(tx.active_stream_count().await, 1);
674
675        let retrieved = tx.get_stream_config(&stream_id).await.unwrap();
676        assert_eq!(retrieved.iss, config.iss);
677    }
678
679    #[tokio::test]
680    async fn test_transmitter_stream_status() {
681        let tx = CaepTransmitter::new("https://iss.example");
682        let id = tx
683            .create_stream(StreamConfig::poll("https://iss.example", "").build())
684            .await;
685
686        assert_eq!(tx.get_stream_status(&id).await, Some(StreamStatus::Enabled));
687
688        tx.set_stream_status(&id, StreamStatus::Paused)
689            .await
690            .unwrap();
691        assert_eq!(tx.get_stream_status(&id).await, Some(StreamStatus::Paused));
692        assert_eq!(tx.active_stream_count().await, 0);
693    }
694
695    #[tokio::test]
696    async fn test_transmitter_delete_stream() {
697        let tx = CaepTransmitter::new("https://iss.example");
698        let id = tx.create_stream(StreamConfig::push("", "").build()).await;
699
700        assert!(tx.delete_stream(&id).await);
701        assert!(!tx.delete_stream(&id).await); // second delete → false
702        assert_eq!(tx.active_stream_count().await, 0);
703    }
704
705    #[tokio::test]
706    async fn test_transmitter_emit_session_revoked() {
707        let tx = CaepTransmitter::new("https://iss.example");
708        let id = tx
709            .create_stream(
710                StreamConfig::poll("", "")
711                    .supports_event(event_types::SESSION_REVOKED)
712                    .build(),
713            )
714            .await;
715
716        let event = tx
717            .emit_session_revoked(
718                SubjectIdentifier::Email {
719                    email: "u@x.com".to_string(),
720                },
721                Some("Security policy"),
722            )
723            .await
724            .unwrap();
725
726        assert_eq!(event.event_type, event_types::SESSION_REVOKED);
727
728        let events = tx.poll_events(&id).await.unwrap();
729        assert_eq!(events.len(), 1);
730        assert_eq!(events[0].jti, event.jti);
731
732        // Second poll should be empty (events consumed)
733        let events2 = tx.poll_events(&id).await.unwrap();
734        assert!(events2.is_empty());
735    }
736
737    #[tokio::test]
738    async fn test_transmitter_event_filtering() {
739        let tx = CaepTransmitter::new("https://iss.example");
740
741        // Stream only wants session-revoked events
742        let id = tx
743            .create_stream(
744                StreamConfig::poll("", "")
745                    .supports_event(event_types::SESSION_REVOKED)
746                    .build(),
747            )
748            .await;
749
750        // Emit a credential_change event — should NOT appear
751        tx.emit_credential_change(
752            SubjectIdentifier::Email {
753                email: "u@x.com".to_string(),
754            },
755            ChangeType::Revoke,
756        )
757        .await
758        .unwrap();
759
760        let events = tx.poll_events(&id).await.unwrap();
761        assert!(events.is_empty());
762    }
763
764    #[tokio::test]
765    async fn test_transmitter_paused_stream_no_events() {
766        let tx = CaepTransmitter::new("https://iss.example");
767        let id = tx.create_stream(StreamConfig::poll("", "").build()).await;
768
769        tx.set_stream_status(&id, StreamStatus::Paused)
770            .await
771            .unwrap();
772
773        tx.emit_session_revoked(
774            SubjectIdentifier::Email {
775                email: "u@x.com".to_string(),
776            },
777            None,
778        )
779        .await
780        .unwrap();
781
782        // Paused streams can't be polled
783        assert!(tx.poll_events(&id).await.is_err());
784    }
785
786    // ── Receiver ────────────────────────────────────────────────
787
788    #[tokio::test]
789    async fn test_receiver_process_event() {
790        let rx = CaepReceiver::new();
791        let called = Arc::new(std::sync::atomic::AtomicBool::new(false));
792        let called_clone = called.clone();
793
794        rx.on_event(
795            event_types::SESSION_REVOKED,
796            Arc::new(move |_event| {
797                called_clone.store(true, std::sync::atomic::Ordering::SeqCst);
798            }),
799        )
800        .await;
801
802        let event = CaepEvent::new(
803            "https://iss.example",
804            event_types::SESSION_REVOKED,
805            SubjectIdentifier::Email {
806                email: "u@x.com".to_string(),
807            },
808        );
809
810        let processed = rx.process_event(&event).await.unwrap();
811        assert!(processed);
812        assert!(called.load(std::sync::atomic::Ordering::SeqCst));
813    }
814
815    #[test]
816    fn test_stream_config_builder() {
817        let config =
818            StreamConfig::poll("https://issuer.example", "https://receiver.example/events")
819                .audience("https://receiver.example")
820                .audiences(["https://backup.example"])
821                .supports_event(event_types::SESSION_REVOKED)
822                .events_supported([event_types::CREDENTIAL_CHANGE])
823                .build();
824
825        assert_eq!(config.delivery_method, DeliveryMethod::Poll);
826        assert_eq!(config.aud.len(), 2);
827        assert_eq!(config.events_supported.len(), 2);
828    }
829
830    #[tokio::test]
831    async fn test_receiver_deduplication() {
832        let rx = CaepReceiver::new();
833        let event = CaepEvent::new(
834            "https://iss.example",
835            event_types::CREDENTIAL_CHANGE,
836            SubjectIdentifier::Opaque {
837                id: "x".to_string(),
838            },
839        );
840
841        let first = rx.process_event(&event).await.unwrap();
842        assert!(first);
843
844        let second = rx.process_event(&event).await.unwrap();
845        assert!(!second); // duplicate
846
847        assert!(rx.was_processed(&event.jti).await);
848    }
849
850    #[tokio::test]
851    async fn test_receiver_unhandled_event_type() {
852        let rx = CaepReceiver::new();
853        let event = CaepEvent::new(
854            "https://iss.example",
855            "custom:unknown-event",
856            SubjectIdentifier::Opaque {
857                id: "x".to_string(),
858            },
859        );
860        // Should succeed even without a registered handler
861        let processed = rx.process_event(&event).await.unwrap();
862        assert!(processed);
863    }
864
865    // ── Delivery method serialization ───────────────────────────
866
867    #[test]
868    fn test_delivery_method_serialization() {
869        assert_eq!(
870            serde_json::to_string(&DeliveryMethod::Push).unwrap(),
871            r#""push""#
872        );
873        assert_eq!(
874            serde_json::to_string(&DeliveryMethod::Poll).unwrap(),
875            r#""poll""#
876        );
877    }
878}