kube_runtime/
events.rs

1//! Publishes events for objects for kubernetes >= 1.19
2use k8s_openapi::{
3    api::{
4        core::v1::ObjectReference,
5        events::v1::{Event as K8sEvent, EventSeries},
6    },
7    apimachinery::pkg::apis::meta::v1::{MicroTime, ObjectMeta},
8    jiff::{SignedDuration, Timestamp},
9};
10use kube_client::{
11    Client, ResourceExt,
12    api::{Api, Patch, PatchParams, PostParams},
13};
14use std::{
15    collections::HashMap,
16    hash::{Hash, Hasher},
17    sync::Arc,
18};
19use tokio::sync::RwLock;
20
21const CACHE_TTL: SignedDuration = SignedDuration::from_mins(6);
22
23/// Minimal event type for publishing through [`Recorder::publish`].
24///
25/// All string fields must be human readable.
26pub struct Event {
27    /// The event severity.
28    ///
29    /// Shows up in `kubectl describe` as `Type`.
30    pub type_: EventType,
31
32    /// The short reason explaining why the `action` was taken.
33    ///
34    /// This must be at most 128 characters, generally in `PascalCase`. Shows up in `kubectl describe` as `Reason`.
35    pub reason: String,
36
37    /// A optional description of the status of the `action`.
38    ///
39    /// This must be at most 1kB in size. Shows up in `kubectl describe` as `Message`.
40    pub note: Option<String>,
41
42    /// The action that was taken (either successfully or unsuccessfully) against main object
43    ///
44    /// This must be at most 128 characters. It does not currently show up in `kubectl describe`.
45    /// A common convention is a short identifier of the action that caused the outcome described in `reason`.
46    /// Usually denoted in `PascalCase`.
47    pub action: String,
48
49    /// Optional secondary object related to the main object
50    ///
51    /// Some events are emitted for actions that affect multiple objects.
52    /// `secondary` can be populated to capture this detail.
53    ///
54    /// For example: the event concerns a `Deployment` and it affects the current `ReplicaSet` underneath it.
55    /// You would therefore populate `events` using the object reference of the `ReplicaSet`.
56    ///
57    /// Set `secondary` to `None`, instead, if the event affects only the object whose reference
58    /// you passed to [`Recorder::new`].
59    ///
60    /// # Naming note
61    ///
62    /// `secondary` is mapped to `related` in
63    /// [`Events API`](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#event-v1-events-k8s-io).
64    ///
65    /// [`Recorder::new`]: crate::events::Recorder::new
66    pub secondary: Option<ObjectReference>,
67}
68
69/// The event severity or type.
70#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
71pub enum EventType {
72    /// An event took place - nothing to worry about.
73    Normal,
74    /// Something is not working as expected - it might be worth to have a look.
75    Warning,
76}
77
78/// [`ObjectReference`] with Hash and Eq implementations
79///
80/// [`ObjectReference`]: k8s_openapi::api::core::v1::ObjectReference
81#[derive(Clone, Debug, PartialEq)]
82pub struct Reference(ObjectReference);
83
84impl Eq for Reference {}
85
86impl Hash for Reference {
87    fn hash<H: Hasher>(&self, state: &mut H) {
88        self.0.api_version.hash(state);
89        self.0.kind.hash(state);
90        self.0.name.hash(state);
91        self.0.namespace.hash(state);
92        self.0.uid.hash(state);
93    }
94}
95
96/// Cache key for event deduplication
97#[derive(Clone, Debug, PartialEq, Eq, Hash)]
98struct EventKey {
99    pub event_type: EventType,
100    pub action: String,
101    pub reason: String,
102    pub reporting_controller: String,
103    pub reporting_instance: Option<String>,
104    pub regarding: Reference,
105    pub related: Option<Reference>,
106}
107
108/// Information about the reporting controller.
109///
110/// ```
111/// use kube::runtime::events::Reporter;
112///
113/// let reporter = Reporter {
114///     controller: "my-awesome-controller".into(),
115///     instance: std::env::var("CONTROLLER_POD_NAME").ok(),
116/// };
117/// ```
118#[derive(Clone, Debug, PartialEq, Eq, Hash)]
119pub struct Reporter {
120    /// The name of the reporting controller that is publishing the event.
121    ///
122    /// This is likely your deployment.metadata.name.
123    pub controller: String,
124
125    /// The id of the controller publishing the event. Likely your pod name.
126    ///
127    /// Useful when running more than one replica on your controller and you need to disambiguate
128    /// where events came from.
129    ///
130    /// The name of the controller pod can be retrieved using Kubernetes' API or
131    /// it can be injected as an environment variable using
132    ///
133    /// ```yaml
134    /// env:
135    ///   - name: CONTROLLER_POD_NAME
136    ///     valueFrom:
137    ///       fieldRef:
138    ///         fieldPath: metadata.name
139    /// ```
140    ///
141    /// in the manifest of your controller.
142    ///
143    /// Note: If `instance` is not provided, the hostname is used. If the hostname is also
144    /// unavailable, `reporting_instance` defaults to `reporting_controller` in the `Event`.
145    pub instance: Option<String>,
146}
147
148// simple conversions for when instance == controller
149impl From<String> for Reporter {
150    fn from(es: String) -> Self {
151        Self {
152            controller: es,
153            instance: None,
154        }
155    }
156}
157
158impl From<&str> for Reporter {
159    fn from(es: &str) -> Self {
160        let instance = hostname::get().ok().and_then(|h| h.into_string().ok());
161        Self {
162            controller: es.into(),
163            instance,
164        }
165    }
166}
167
168/// A publisher abstraction to emit Kubernetes' events.
169///
170/// All events emitted by an `Recorder` are attached to the [`ObjectReference`]
171/// specified when building the recorder using [`Recorder::new`].
172///
173/// ```
174/// use kube::runtime::events::{Reporter, Recorder, Event, EventType};
175/// use k8s_openapi::api::core::v1::ObjectReference;
176///
177/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
178/// # let client: kube::Client = todo!();
179/// let reporter = Reporter {
180///     controller: "my-awesome-controller".into(),
181///     instance: std::env::var("CONTROLLER_POD_NAME").ok(),
182/// };
183///
184/// let recorder = Recorder::new(client, reporter);
185///
186/// // references can be made manually using `ObjectMeta` and `ApiResource`/`Resource` info
187/// let reference = ObjectReference {
188///     // [...]
189///     ..Default::default()
190/// };
191/// // or for k8s-openapi / kube-derive types, use Resource::object_ref:
192/// // let reference = myobject.object_ref();
193/// recorder
194///     .publish(
195///         &Event {
196///             action: "Scheduling".into(),
197///             reason: "Pulling".into(),
198///             note: Some("Pulling image `nginx`".into()),
199///             type_: EventType::Normal,
200///             secondary: None,
201///         },
202///         &reference,
203///     ).await?;
204/// # Ok(())
205/// # }
206/// ```
207///
208/// Events attached to an object will be shown in the `Events` section of the output of
209/// of `kubectl describe` for that object.
210///
211/// ## RBAC
212///
213/// Note that usage of the event recorder minimally requires the following RBAC rules:
214///
215/// ```yaml
216/// - apiGroups: ["events.k8s.io"]
217///   resources: ["events"]
218///   verbs: ["create", "patch"]
219/// ```
220#[derive(Clone)]
221pub struct Recorder {
222    client: Client,
223    reporter: Reporter,
224    cache: Arc<RwLock<HashMap<EventKey, K8sEvent>>>,
225}
226
227impl Recorder {
228    /// Create a new recorder that can publish events for one specific object
229    ///
230    /// This is intended to be created at the start of your controller's reconcile fn.
231    ///
232    /// Cluster scoped objects will publish events in the "default" namespace.
233    #[must_use]
234    pub fn new(client: Client, reporter: Reporter) -> Self {
235        let cache = Arc::default();
236        Self {
237            client,
238            reporter,
239            cache,
240        }
241    }
242
243    /// Builds unique event key based on reportingController, reportingInstance, regarding, reason
244    ///  and note
245    fn get_event_key(&self, ev: &Event, regarding: &ObjectReference) -> EventKey {
246        EventKey {
247            event_type: ev.type_,
248            action: ev.action.clone(),
249            reason: ev.reason.clone(),
250            reporting_controller: self.reporter.controller.clone(),
251            reporting_instance: self.reporter.instance.clone(),
252            regarding: Reference(regarding.clone()),
253            related: ev.secondary.clone().map(Reference),
254        }
255    }
256
257    // See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.35/#event-v1-events-k8s-io
258    // for more detail on the fields
259    // and what's expected: https://kubernetes.io/docs/reference/using-api/deprecation-guide/#event-v125
260    fn generate_event(&self, ev: &Event, reference: &ObjectReference) -> K8sEvent {
261        let now = Timestamp::now();
262        K8sEvent {
263            action: Some(ev.action.clone()),
264            reason: Some(ev.reason.clone()),
265            deprecated_count: None,
266            deprecated_first_timestamp: None,
267            deprecated_last_timestamp: None,
268            deprecated_source: None,
269            event_time: Some(MicroTime(now)),
270            regarding: Some(reference.clone()),
271            note: ev.note.clone(),
272            metadata: ObjectMeta {
273                namespace: reference.namespace.clone(),
274                name: Some(format!(
275                    "{}.{:x}",
276                    reference.name.as_ref().unwrap_or(&self.reporter.controller),
277                    now.as_nanosecond()
278                )),
279                ..Default::default()
280            },
281            reporting_controller: Some(self.reporter.controller.clone()),
282            reporting_instance: Some(
283                self.reporter
284                    .instance
285                    .clone()
286                    .unwrap_or_else(|| self.reporter.controller.clone()),
287            ),
288            series: None,
289            type_: match ev.type_ {
290                EventType::Normal => Some("Normal".into()),
291                EventType::Warning => Some("Warning".into()),
292            },
293            related: ev.secondary.clone(),
294        }
295    }
296
297    /// Publish a new Kubernetes' event.
298    ///
299    /// # Access control
300    ///
301    /// The event object is created in the same namespace of the [`ObjectReference`].
302    /// Make sure that your controller has `create` permissions in the required namespaces
303    /// for the `event` resource in the API group `events.k8s.io`.
304    ///
305    /// # Errors
306    ///
307    /// Returns an [`Error`](`kube_client::Error`) if the event is rejected by Kubernetes.
308    pub async fn publish(&self, ev: &Event, reference: &ObjectReference) -> Result<(), kube_client::Error> {
309        let now = Timestamp::now();
310
311        // gc past events older than now + CACHE_TTL
312        self.cache.write().await.retain(|_, v| {
313            if let Some(series) = v.series.as_ref() {
314                series.last_observed_time.0 + CACHE_TTL > now
315            } else if let Some(event_time) = v.event_time.as_ref() {
316                event_time.0 + CACHE_TTL > now
317            } else {
318                true
319            }
320        });
321
322        let key = self.get_event_key(ev, reference);
323        let event = match self.cache.read().await.get(&key) {
324            Some(e) => {
325                let count = if let Some(s) = &e.series { s.count + 1 } else { 2 };
326                let series = EventSeries {
327                    count,
328                    last_observed_time: MicroTime(now),
329                };
330                let mut event = e.clone();
331                event.series = Some(series);
332                event
333            }
334            None => self.generate_event(ev, reference),
335        };
336
337        let events = Api::namespaced(
338            self.client.clone(),
339            reference.namespace.as_ref().unwrap_or(&"default".to_string()),
340        );
341        if event.series.is_some() {
342            events
343                .patch(&event.name_any(), &PatchParams::default(), &Patch::Merge(&event))
344                .await?;
345        } else {
346            events.create(&PostParams::default(), &event).await?;
347        }
348
349        {
350            let mut cache = self.cache.write().await;
351            cache.insert(key, event);
352        }
353        Ok(())
354    }
355}
356
357#[cfg(test)]
358mod test {
359    use super::{Event, EventKey, EventType, Recorder, Reference, Reporter};
360
361    use k8s_openapi::{
362        api::{
363            core::v1::{ComponentStatus, Service},
364            events::v1::Event as K8sEvent,
365        },
366        apimachinery::pkg::apis::meta::v1::MicroTime,
367        jiff::{SignedDuration, Timestamp},
368    };
369    use kube::{Api, Client, Resource};
370
371    #[tokio::test]
372    #[ignore = "needs cluster (creates an event for the default kubernetes service)"]
373    async fn event_recorder_attaches_events() -> Result<(), Box<dyn std::error::Error>> {
374        let client = Client::try_default().await?;
375
376        let svcs: Api<Service> = Api::namespaced(client.clone(), "default");
377        let s = svcs.get("kubernetes").await?; // always a kubernetes service in default
378        let recorder = Recorder::new(client.clone(), "kube".into());
379        recorder
380            .publish(
381                &Event {
382                    type_: EventType::Normal,
383                    reason: "VeryCoolService".into(),
384                    note: Some("Sending kubernetes to detention".into()),
385                    action: "Test event - plz ignore".into(),
386                    secondary: None,
387                },
388                &s.object_ref(&()),
389            )
390            .await?;
391        let events: Api<K8sEvent> = Api::namespaced(client, "default");
392
393        let event_list = events.list(&Default::default()).await?;
394        let found_event = event_list
395            .into_iter()
396            .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolService")))
397            .unwrap();
398        assert_eq!(found_event.note.unwrap(), "Sending kubernetes to detention");
399
400        recorder
401            .publish(
402                &Event {
403                    type_: EventType::Normal,
404                    reason: "VeryCoolService".into(),
405                    note: Some("Sending kubernetes to detention twice".into()),
406                    action: "Test event - plz ignore".into(),
407                    secondary: None,
408                },
409                &s.object_ref(&()),
410            )
411            .await?;
412
413        let event_list = events.list(&Default::default()).await?;
414        let found_event = event_list
415            .into_iter()
416            .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolService")))
417            .unwrap();
418        assert!(found_event.series.is_some());
419
420        Ok(())
421    }
422
423    #[tokio::test]
424    #[ignore = "needs cluster (creates an event for the default kubernetes service)"]
425    async fn event_recorder_attaches_events_without_namespace() -> Result<(), Box<dyn std::error::Error>> {
426        let client = Client::try_default().await?;
427
428        let component_status_api: Api<ComponentStatus> = Api::all(client.clone());
429        let s = component_status_api.get("scheduler").await?;
430        let recorder = Recorder::new(client.clone(), "kube".into());
431        recorder
432            .publish(
433                &Event {
434                    type_: EventType::Normal,
435                    reason: "VeryCoolServiceNoNamespace".into(),
436                    note: Some("Sending kubernetes to detention without namespace".into()),
437                    action: "Test event - plz ignore".into(),
438                    secondary: None,
439                },
440                &s.object_ref(&()),
441            )
442            .await?;
443        let events: Api<K8sEvent> = Api::namespaced(client, "default");
444
445        let event_list = events.list(&Default::default()).await?;
446        let found_event = event_list
447            .into_iter()
448            .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolServiceNoNamespace")))
449            .unwrap();
450        assert_eq!(
451            found_event.note.unwrap(),
452            "Sending kubernetes to detention without namespace"
453        );
454
455        recorder
456            .publish(
457                &Event {
458                    type_: EventType::Normal,
459                    reason: "VeryCoolServiceNoNamespace".into(),
460                    note: Some("Sending kubernetes to detention without namespace twice".into()),
461                    action: "Test event - plz ignore".into(),
462                    secondary: None,
463                },
464                &s.object_ref(&()),
465            )
466            .await?;
467
468        let event_list = events.list(&Default::default()).await?;
469        let found_event = event_list
470            .into_iter()
471            .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolServiceNoNamespace")))
472            .unwrap();
473        assert!(found_event.series.is_some());
474        Ok(())
475    }
476
477    #[tokio::test]
478    #[ignore = "needs cluster (creates an event for the default kubernetes service)"]
479    async fn event_recorder_cache_retain() -> Result<(), Box<dyn std::error::Error>> {
480        let client = Client::try_default().await?;
481
482        let svcs: Api<Service> = Api::namespaced(client.clone(), "default");
483        let s = svcs.get("kubernetes").await?; // always a kubernetes service in default
484
485        let reference = s.object_ref(&());
486        let reporter: Reporter = "kube".into();
487        let ev = Event {
488            type_: EventType::Normal,
489            reason: "TestCacheTtl".into(),
490            note: Some("Sending kubernetes to detention".into()),
491            action: "Test event - plz ignore".into(),
492            secondary: None,
493        };
494        let key = EventKey {
495            event_type: ev.type_,
496            action: ev.action.clone(),
497            reason: ev.reason.clone(),
498            reporting_controller: reporter.controller.clone(),
499            regarding: Reference(reference.clone()),
500            reporting_instance: None,
501            related: None,
502        };
503
504        let reporter = Reporter {
505            controller: "kube".into(),
506            instance: None,
507        };
508        let recorder = Recorder::new(client.clone(), reporter);
509
510        recorder.publish(&ev, &s.object_ref(&())).await?;
511        let now = Timestamp::now();
512        let past = now - SignedDuration::from_mins(10);
513        recorder.cache.write().await.entry(key).and_modify(|e| {
514            e.event_time = Some(MicroTime(past));
515        });
516
517        recorder.publish(&ev, &s.object_ref(&())).await?;
518
519        let events: Api<K8sEvent> = Api::namespaced(client, "default");
520        let event_list = events.list(&Default::default()).await?;
521        let found_event = event_list
522            .into_iter()
523            .find(|e| std::matches!(e.reason.as_deref(), Some("TestCacheTtl")))
524            .unwrap();
525        assert_eq!(found_event.note.unwrap(), "Sending kubernetes to detention");
526        assert!(found_event.series.is_none());
527
528        Ok(())
529    }
530}