Skip to main content

mongodb/trace/
topology.rs

1use crate::bson::oid::ObjectId;
2
3use crate::{
4    bson_util::doc_to_json_str,
5    event::sdam::{
6        SdamEvent,
7        ServerClosedEvent,
8        ServerDescriptionChangedEvent,
9        ServerHeartbeatFailedEvent,
10        ServerHeartbeatStartedEvent,
11        ServerHeartbeatSucceededEvent,
12        ServerOpeningEvent,
13        TopologyClosedEvent,
14        TopologyDescription,
15        TopologyDescriptionChangedEvent,
16        TopologyOpeningEvent,
17    },
18};
19
20use super::{
21    trace_or_log_enabled,
22    TracingOrLogLevel,
23    TracingRepresentation,
24    DEFAULT_MAX_DOCUMENT_LENGTH_BYTES,
25    TOPOLOGY_TRACING_EVENT_TARGET,
26};
27
28impl TracingRepresentation for TopologyDescription {
29    type Representation = String;
30
31    fn tracing_representation(&self) -> Self::Representation {
32        self.to_string()
33    }
34}
35
36pub(crate) struct TopologyTracingEventEmitter {
37    max_document_length_bytes: usize,
38    topology_id: ObjectId,
39}
40
41impl TopologyTracingEventEmitter {
42    pub(crate) fn new(
43        max_document_length_bytes: Option<usize>,
44        topology_id: ObjectId,
45    ) -> TopologyTracingEventEmitter {
46        TopologyTracingEventEmitter {
47            max_document_length_bytes: max_document_length_bytes
48                .unwrap_or(DEFAULT_MAX_DOCUMENT_LENGTH_BYTES),
49            topology_id,
50        }
51    }
52}
53
54impl TopologyTracingEventEmitter {
55    pub(crate) fn handle(&self, event: SdamEvent) {
56        use SdamEvent::*;
57        match event {
58            ServerDescriptionChanged(ev) => self.handle_server_description_changed_event(*ev),
59            ServerOpening(ev) => self.handle_server_opening_event(ev),
60            ServerClosed(ev) => self.handle_server_closed_event(ev),
61            TopologyDescriptionChanged(ev) => self.handle_topology_description_changed_event(*ev),
62            TopologyOpening(ev) => self.handle_topology_opening_event(ev),
63            TopologyClosed(ev) => self.handle_topology_closed_event(ev),
64            ServerHeartbeatStarted(ev) => self.handle_server_heartbeat_started_event(ev),
65            ServerHeartbeatSucceeded(ev) => self.handle_server_heartbeat_succeeded_event(ev),
66            ServerHeartbeatFailed(ev) => self.handle_server_heartbeat_failed_event(ev),
67        }
68    }
69
70    fn handle_server_description_changed_event(&self, _event: ServerDescriptionChangedEvent) {
71        // this is tentatively a no-op based on my proposal to not do separate "topology changed"
72        // and "server changed" log messages due to the redundancy, but that could change
73        // based on the spec review process.
74    }
75
76    fn handle_server_opening_event(&self, event: ServerOpeningEvent) {
77        if trace_or_log_enabled!(
78            target: TOPOLOGY_TRACING_EVENT_TARGET,
79            TracingOrLogLevel::Debug
80        ) {
81            tracing::debug!(
82                target: TOPOLOGY_TRACING_EVENT_TARGET,
83                topologyId = self.topology_id.tracing_representation(),
84                serverHost = event.address.host().as_ref(),
85                serverPort = event.address.port_tracing_representation(),
86                "Starting server monitoring"
87            )
88        }
89    }
90
91    fn handle_server_closed_event(&self, event: ServerClosedEvent) {
92        if trace_or_log_enabled!(
93            target: TOPOLOGY_TRACING_EVENT_TARGET,
94            TracingOrLogLevel::Debug
95        ) {
96            tracing::debug!(
97                target: TOPOLOGY_TRACING_EVENT_TARGET,
98                topologyId = self.topology_id.tracing_representation(),
99                serverHost = event.address.host().as_ref(),
100                serverPort = event.address.port_tracing_representation(),
101                "Stopped server monitoring"
102            )
103        }
104    }
105
106    fn handle_topology_description_changed_event(&self, event: TopologyDescriptionChangedEvent) {
107        if trace_or_log_enabled!(
108            target: TOPOLOGY_TRACING_EVENT_TARGET,
109            TracingOrLogLevel::Debug
110        ) {
111            tracing::debug!(
112                target: TOPOLOGY_TRACING_EVENT_TARGET,
113                topologyId = self.topology_id.tracing_representation(),
114                previousDescription = event.previous_description.tracing_representation(),
115                newDescription = event.new_description.tracing_representation(),
116                "Topology description changed"
117            )
118        }
119    }
120
121    fn handle_topology_opening_event(&self, _event: TopologyOpeningEvent) {
122        if trace_or_log_enabled!(
123            target: TOPOLOGY_TRACING_EVENT_TARGET,
124            TracingOrLogLevel::Debug
125        ) {
126            tracing::debug!(
127                target: TOPOLOGY_TRACING_EVENT_TARGET,
128                topologyId = self.topology_id.tracing_representation(),
129                "Starting topology monitoring"
130            )
131        }
132    }
133
134    fn handle_topology_closed_event(&self, _event: TopologyClosedEvent) {
135        if trace_or_log_enabled!(
136            target: TOPOLOGY_TRACING_EVENT_TARGET,
137            TracingOrLogLevel::Debug
138        ) {
139            tracing::debug!(
140                target: TOPOLOGY_TRACING_EVENT_TARGET,
141                topologyId = self.topology_id.tracing_representation(),
142                "Stopped topology monitoring"
143            )
144        }
145    }
146
147    fn handle_server_heartbeat_started_event(&self, event: ServerHeartbeatStartedEvent) {
148        if trace_or_log_enabled!(
149            target: TOPOLOGY_TRACING_EVENT_TARGET,
150            TracingOrLogLevel::Debug
151        ) {
152            tracing::debug!(
153                target: TOPOLOGY_TRACING_EVENT_TARGET,
154                topologyId = self.topology_id.tracing_representation(),
155                serverHost = event.server_address.host().as_ref(),
156                serverPort = event.server_address.port_tracing_representation(),
157                driverConnectionId = event.driver_connection_id,
158                serverConnectionId = event.server_connection_id,
159                awaited = event.awaited,
160                "Server heartbeat started"
161            )
162        }
163    }
164
165    fn handle_server_heartbeat_succeeded_event(&self, event: ServerHeartbeatSucceededEvent) {
166        if trace_or_log_enabled!(
167            target: TOPOLOGY_TRACING_EVENT_TARGET,
168            TracingOrLogLevel::Debug
169        ) {
170            tracing::debug!(
171                target: TOPOLOGY_TRACING_EVENT_TARGET,
172                topologyId = self.topology_id.tracing_representation(),
173                serverHost = event.server_address.host().as_ref(),
174                serverPort = event.server_address.port_tracing_representation(),
175                driverConnectionId = event.driver_connection_id,
176                serverConnectionId = event.server_connection_id,
177                awaited = event.awaited,
178                reply = doc_to_json_str(event.reply, self.max_document_length_bytes),
179                durationMS = event.duration.as_millis(),
180                "Server heartbeat succeeded"
181            )
182        }
183    }
184
185    fn handle_server_heartbeat_failed_event(&self, event: ServerHeartbeatFailedEvent) {
186        if trace_or_log_enabled!(
187            target: TOPOLOGY_TRACING_EVENT_TARGET,
188            TracingOrLogLevel::Debug
189        ) {
190            tracing::debug!(
191                target: TOPOLOGY_TRACING_EVENT_TARGET,
192                topologyId = self.topology_id.tracing_representation(),
193                serverHost = event.server_address.host().as_ref(),
194                serverPort = event.server_address.port_tracing_representation(),
195                driverConnectionId = event.driver_connection_id,
196                serverConnectionId = event.server_connection_id,
197                awaited = event.awaited,
198                failure = event.failure.tracing_representation(self.max_document_length_bytes),
199                durationMS = event.duration.as_millis(),
200                "Server heartbeat failed"
201            )
202        }
203    }
204}