spiffe 0.13.0

Core SPIFFE identity types and Workload API sources
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
use super::builder::{ReconnectConfig, ResourceLimits};
use super::errors::{JwtSourceError, MetricsErrorKind};
use super::limits::validate_bundle_set;
use super::metrics::MetricsRecorder;
use super::source::Inner;
use super::types::ClientFactory;
use crate::bundle::jwt::JwtBundleSet;
use crate::prelude::{debug, info, warn};
use crate::workload_api::error::WorkloadApiError;
use crate::workload_api::supervisor_common::{
    self, ErrorKey, ErrorTracker, StreamPhase, MAX_CONSECUTIVE_SAME_ERROR,
};
use crate::workload_api::WorkloadApiClient;
use futures::StreamExt as _;
use std::sync::Arc;
use std::time::Duration;
use tokio_util::sync::CancellationToken;

/// Attempts to create a Workload API client.
///
/// Records metrics and logs errors.
///
/// On error, records `ClientCreation` metric. The caller should call `record_reconnect`
/// for steady-state reconnections (not for initial sync).
pub(super) async fn try_create_client(
    make_client: &ClientFactory,
    backoff: Duration,
    error_tracker: &mut ErrorTracker,
    metrics: Option<&dyn MetricsRecorder>,
) -> Result<WorkloadApiClient, WorkloadApiError> {
    match (make_client)().await {
        Ok(c) => {
            // Only log recovery if there were significant consecutive failures (>= 3) of the same type
            if error_tracker.last_error_kind() == Some(ErrorKey::ClientCreation)
                && error_tracker.consecutive_count() >= 3
            {
                debug!(
                    "Client creation recovered after {} consecutive failures",
                    error_tracker.consecutive_count()
                );
            }
            // Only reset tracker if the last error was client creation (actual recovery).
            // Don't reset if tracking stream connection errors (e.g., NoIdentityIssued).
            if error_tracker.last_error_kind() == Some(ErrorKey::ClientCreation) {
                error_tracker.reset();
            }
            Ok(c)
        }
        Err(e) => {
            let error_kind = ErrorKey::ClientCreation;
            let should_warn = error_tracker.record_error(error_kind);

            if should_warn {
                warn!(
                    "Failed to create WorkloadApiClient; retrying: error={}, backoff_ms={}",
                    e,
                    backoff.as_millis()
                );
            } else {
                debug!(
                    "Failed to create WorkloadApiClient (repeated); retrying: error={}, backoff_ms={}, consecutive_failures={}",
                    e,
                    backoff.as_millis(),
                    error_tracker.consecutive_count()
                );
            }

            if let Some(m) = metrics {
                m.record_error(MetricsErrorKind::ClientCreation);
            }
            Err(e)
        }
    }
}

/// Attempts to connect to the JWT bundle stream.
///
/// Records metrics and logs errors. Does not modify backoff (caller manages backoff progression).
///
/// On error, records `StreamConnect` metric. The caller should call `record_reconnect`
/// for steady-state reconnections (not for initial sync).
pub(super) async fn try_connect_stream(
    client: &WorkloadApiClient,
    backoff: Duration,
    error_tracker: &mut ErrorTracker,
    metrics: Option<&dyn MetricsRecorder>,
    phase: StreamPhase,
    supervisor_id: Option<u64>,
) -> Result<
    impl futures::Stream<Item = Result<JwtBundleSet, WorkloadApiError>> + Send + 'static + use<>,
    WorkloadApiError,
> {
    match client.stream_jwt_bundles().await {
        Ok(s) => {
            let id_suffix = supervisor_id.map_or_else(String::new, |id| format!(", id={id}"));

            // Only log recovery if the last error was a stream connection failure
            if error_tracker.last_error_kind() == Some(ErrorKey::StreamConnect)
                && error_tracker.consecutive_count() > 0
            {
                info!(
                    "Stream connection recovered after {} consecutive failures (phase={:?}{})",
                    error_tracker.consecutive_count(),
                    phase,
                    id_suffix
                );
            }
            error_tracker.reset();
            info!(
                "Connected to Workload API JWT bundle stream (phase={:?}{})",
                phase, id_suffix
            );
            Ok(s)
        }
        Err(e) => {
            // Handle "no identity issued" as a distinct transient state
            if matches!(e, WorkloadApiError::NoIdentityIssued) {
                let error_kind = ErrorKey::NoIdentityIssued;
                let should_warn = error_tracker.record_error(error_kind);

                if should_warn {
                    warn!("No identity issued yet; waiting before retry");
                } else {
                    debug!(
                        "No identity issued yet (repeated); waiting before retry: consecutive_failures={}",
                        error_tracker.consecutive_count()
                    );
                }

                // Don't record this as a stream error metric (it's expected/transient)
                return Err(e);
            }

            let error_kind = ErrorKey::StreamConnect;
            let should_warn = error_tracker.record_error(error_kind);

            if should_warn {
                warn!(
                    "Failed to connect to Workload API stream; retrying: error={}, backoff_ms={}",
                    e,
                    backoff.as_millis()
                );
            } else {
                debug!(
                    "Failed to connect to Workload API stream (repeated); retrying: error={}, backoff_ms={}, consecutive_failures={}",
                    e,
                    backoff.as_millis(),
                    error_tracker.consecutive_count()
                );
            }

            if let Some(m) = metrics {
                m.record_error(MetricsErrorKind::StreamConnect);
            }
            Err(e)
        }
    }
}

pub(super) async fn initial_sync_with_retry(
    make_client: &ClientFactory,
    cancel: &CancellationToken,
    reconnect: ReconnectConfig,
    limits: ResourceLimits,
    metrics: Option<&dyn MetricsRecorder>,
) -> Result<Arc<JwtBundleSet>, JwtSourceError> {
    let mut backoff = reconnect.min_backoff;
    let mut error_tracker = ErrorTracker::new(MAX_CONSECUTIVE_SAME_ERROR);

    loop {
        if cancel.is_cancelled() {
            return Err(JwtSourceError::Closed);
        }

        match try_sync_once(make_client, limits, metrics, backoff, &mut error_tracker).await {
            Ok(v) => return Ok(v),
            Err(e) => {
                // Record InitialSyncFailed as an umbrella metric for any failed attempt.
                // Specific metrics (ClientCreation, StreamConnect, StreamError, StreamEnded,
                // LimitMaxBundles, LimitMaxBundleJwksBytes) are already recorded in try_sync_once().
                // Detailed logs are also produced by try_create_client/try_connect_stream/stream read,
                // so we avoid duplicate outer logs here to reduce noise.
                if let Some(m) = metrics {
                    m.record_error(MetricsErrorKind::InitialSyncFailed);
                }
                if sleep_or_cancel(cancel, backoff).await {
                    return Err(JwtSourceError::Closed);
                }
                // Choose backoff policy based on error type
                match &e {
                    JwtSourceError::Source(WorkloadApiError::NoIdentityIssued) => {
                        // Use slower backoff for "no identity issued" (expected transient state)
                        backoff = next_backoff_for_no_identity(backoff, reconnect.max_backoff);
                        warn!(
                            "Initial sync: no identity issued, using backoff_ms={}",
                            backoff.as_millis()
                        );
                    }
                    _ => {
                        // Use standard exponential backoff for other errors
                        backoff = next_backoff(backoff, reconnect.max_backoff);
                    }
                }
            }
        }
    }
}

async fn try_sync_once(
    make_client: &ClientFactory,
    limits: ResourceLimits,
    metrics: Option<&dyn MetricsRecorder>,
    backoff: Duration,
    error_tracker: &mut ErrorTracker,
) -> Result<Arc<JwtBundleSet>, JwtSourceError> {
    // Use shared client creation logic (records ClientCreation metric on failure).
    // Initial sync does not record reconnect metrics (it's not a reconnect).
    let client = match try_create_client(make_client, backoff, error_tracker, metrics).await {
        Ok(c) => c,
        Err(e) => {
            return Err(JwtSourceError::Source(e));
        }
    };

    // Use shared stream connection logic (records StreamConnect metric on failure).
    let mut stream = match try_connect_stream(
        &client,
        backoff,
        error_tracker,
        metrics,
        StreamPhase::InitialSync,
        None,
    )
    .await
    {
        Ok(s) => s,
        Err(e) => {
            return Err(JwtSourceError::Source(e));
        }
    };

    match stream.next().await {
        Some(Ok(bundle_set)) => {
            validate_bundle_set(&bundle_set, limits, metrics).inspect_err(|e| {
                warn!("Initial JWT bundle set rejected; will retry: error={e}");
            })?;

            Ok(Arc::new(bundle_set))
        }
        Some(Err(e)) => {
            // Record StreamError for stream read errors.
            warn!("Initial sync: Workload API stream error; will retry: error={e}");
            if let Some(m) = metrics {
                m.record_error(MetricsErrorKind::StreamError);
            }
            Err(JwtSourceError::Source(e))
        }
        None => {
            // Record StreamEnded for empty stream.
            warn!("Initial sync: Workload API stream ended immediately; will retry");
            if let Some(m) = metrics {
                m.record_error(MetricsErrorKind::StreamEnded);
            }
            Err(JwtSourceError::StreamEnded)
        }
    }
}

pub(super) use supervisor_common::{next_backoff, next_backoff_for_no_identity, sleep_or_cancel};

/// Result of processing a stream of updates.
struct StreamResult {
    /// Whether the cancellation token was triggered.
    cancelled: bool,
    /// Whether at least one update was successfully applied.
    had_successful_update: bool,
}

impl Inner {
    pub(super) async fn run_update_supervisor(&self, cancellation_token: CancellationToken) {
        // Generate a unique supervisor ID for diagnostics (detects duplicate supervisors/repeated constructions).
        let supervisor_id = fastrand::u64(..);
        info!("Starting update supervisor: id={}", supervisor_id);

        let mut backoff = self.reconnect().min_backoff;
        let mut error_tracker = ErrorTracker::new(MAX_CONSECUTIVE_SAME_ERROR);

        loop {
            if cancellation_token.is_cancelled() {
                debug!("Cancellation signal received; stopping updates");
                return;
            }

            let Ok(client) = try_create_client(
                self.make_client(),
                backoff,
                &mut error_tracker,
                self.metrics(),
            )
            .await
            else {
                if self
                    .backoff_and_maybe_cancel(&cancellation_token, backoff)
                    .await
                {
                    return;
                }
                backoff = next_backoff(backoff, self.reconnect().max_backoff);
                continue;
            };

            match try_connect_stream(
                &client,
                backoff,
                &mut error_tracker,
                self.metrics(),
                StreamPhase::Supervisor,
                Some(supervisor_id),
            )
            .await
            {
                Ok(mut stream) => {
                    let result = self
                        .process_stream_updates(&mut stream, &cancellation_token, supervisor_id)
                        .await;
                    if result.cancelled {
                        return;
                    }

                    // Reset backoff only if we successfully processed at least one update,
                    // meaning the stream actually delivered useful data before failing.
                    if result.had_successful_update {
                        backoff = self.reconnect().min_backoff;
                    }

                    // Stream ended or errored. Sleep/backoff before retrying.
                    if self
                        .backoff_and_maybe_cancel(&cancellation_token, backoff)
                        .await
                    {
                        return;
                    }
                    if !result.had_successful_update {
                        backoff = next_backoff(backoff, self.reconnect().max_backoff);
                    }
                }
                Err(stream_err) => {
                    // Choose backoff policy based on error type
                    match stream_err {
                        WorkloadApiError::NoIdentityIssued => {
                            // Use slower backoff for "no identity issued" (expected transient state)
                            backoff =
                                next_backoff_for_no_identity(backoff, self.reconnect().max_backoff);
                            warn!(
                                "No identity issued: using backoff_ms={}",
                                backoff.as_millis()
                            );
                        }
                        _ => {
                            // Use standard exponential backoff for other errors
                            backoff = next_backoff(backoff, self.reconnect().max_backoff);
                        }
                    }

                    if self
                        .backoff_and_maybe_cancel(&cancellation_token, backoff)
                        .await
                    {
                        return;
                    }
                }
            }
        }
    }

    async fn backoff_and_maybe_cancel(&self, token: &CancellationToken, backoff: Duration) -> bool {
        if let Some(metrics) = self.metrics() {
            metrics.record_reconnect();
        }
        sleep_or_cancel(token, backoff).await
    }

    /// Processes stream updates until the stream ends, errors, or cancellation is requested.
    async fn process_stream_updates(
        &self,
        stream: &mut (impl futures::Stream<Item = Result<JwtBundleSet, WorkloadApiError>>
                  + Unpin
                  + Send
                  + 'static),
        cancellation_token: &CancellationToken,
        supervisor_id: u64,
    ) -> StreamResult {
        let mut update_rejection_tracker = ErrorTracker::new(MAX_CONSECUTIVE_SAME_ERROR);
        let mut had_successful_update = false;

        loop {
            let item = tokio::select! {
                () = cancellation_token.cancelled() => {
                    debug!("Cancellation signal received; stopping update loop");
                    return StreamResult { cancelled: true, had_successful_update };
                }
                v = stream.next() => v,
            };

            match item {
                Some(Ok(bundle_set)) => {
                    match self.apply_update(Arc::new(bundle_set)) {
                        Ok(()) => {
                            had_successful_update = true;
                            if update_rejection_tracker.consecutive_count() > 0 {
                                info!(
                                    "Update validation recovered after {} consecutive failures",
                                    update_rejection_tracker.consecutive_count(),
                                );
                                update_rejection_tracker.reset();
                            }
                            info!("JWT bundle set updated");
                        }
                        Err(e) => {
                            let should_warn =
                                update_rejection_tracker.record_error(ErrorKey::UpdateRejected);

                            if should_warn {
                                warn!("Rejected JWT bundle set update: error={e}");
                            } else {
                                debug!(
                                    "Rejected JWT bundle set update (repeated): error={}, consecutive_rejections={}",
                                    e,
                                    update_rejection_tracker.consecutive_count()
                                );
                            }
                            // Metrics already recorded by apply_update(); do not double-count.
                        }
                    }
                }
                Some(Err(e)) => {
                    warn!("Workload API stream error; reconnecting: id={supervisor_id}, error={e}");
                    self.record_error(MetricsErrorKind::StreamError);
                    return StreamResult {
                        cancelled: false,
                        had_successful_update,
                    };
                }
                None => {
                    warn!("Workload API stream ended; reconnecting: id={supervisor_id}");
                    self.record_error(MetricsErrorKind::StreamEnded);
                    return StreamResult {
                        cancelled: false,
                        had_successful_update,
                    };
                }
            }
        }
    }
}