1use crate::bson::oid::ObjectId;
2
3use crate::{
4 bson_util::doc_to_json_str,
5 event::sdam::{
6 SdamEvent,
7 ServerClosedEvent,
8 ServerDescriptionChangedEvent,
9 ServerHeartbeatFailedEvent,
10 ServerHeartbeatStartedEvent,
11 ServerHeartbeatSucceededEvent,
12 ServerOpeningEvent,
13 TopologyClosedEvent,
14 TopologyDescription,
15 TopologyDescriptionChangedEvent,
16 TopologyOpeningEvent,
17 },
18};
19
20use super::{
21 trace_or_log_enabled,
22 TracingOrLogLevel,
23 TracingRepresentation,
24 DEFAULT_MAX_DOCUMENT_LENGTH_BYTES,
25 TOPOLOGY_TRACING_EVENT_TARGET,
26};
27
28impl TracingRepresentation for TopologyDescription {
29 type Representation = String;
30
31 fn tracing_representation(&self) -> Self::Representation {
32 self.to_string()
33 }
34}
35
36pub(crate) struct TopologyTracingEventEmitter {
37 max_document_length_bytes: usize,
38 topology_id: ObjectId,
39}
40
41impl TopologyTracingEventEmitter {
42 pub(crate) fn new(
43 max_document_length_bytes: Option<usize>,
44 topology_id: ObjectId,
45 ) -> TopologyTracingEventEmitter {
46 TopologyTracingEventEmitter {
47 max_document_length_bytes: max_document_length_bytes
48 .unwrap_or(DEFAULT_MAX_DOCUMENT_LENGTH_BYTES),
49 topology_id,
50 }
51 }
52}
53
54impl TopologyTracingEventEmitter {
55 pub(crate) fn handle(&self, event: SdamEvent) {
56 use SdamEvent::*;
57 match event {
58 ServerDescriptionChanged(ev) => self.handle_server_description_changed_event(*ev),
59 ServerOpening(ev) => self.handle_server_opening_event(ev),
60 ServerClosed(ev) => self.handle_server_closed_event(ev),
61 TopologyDescriptionChanged(ev) => self.handle_topology_description_changed_event(*ev),
62 TopologyOpening(ev) => self.handle_topology_opening_event(ev),
63 TopologyClosed(ev) => self.handle_topology_closed_event(ev),
64 ServerHeartbeatStarted(ev) => self.handle_server_heartbeat_started_event(ev),
65 ServerHeartbeatSucceeded(ev) => self.handle_server_heartbeat_succeeded_event(ev),
66 ServerHeartbeatFailed(ev) => self.handle_server_heartbeat_failed_event(ev),
67 }
68 }
69
70 fn handle_server_description_changed_event(&self, _event: ServerDescriptionChangedEvent) {
71 }
75
76 fn handle_server_opening_event(&self, event: ServerOpeningEvent) {
77 if trace_or_log_enabled!(
78 target: TOPOLOGY_TRACING_EVENT_TARGET,
79 TracingOrLogLevel::Debug
80 ) {
81 tracing::debug!(
82 target: TOPOLOGY_TRACING_EVENT_TARGET,
83 topologyId = self.topology_id.tracing_representation(),
84 serverHost = event.address.host().as_ref(),
85 serverPort = event.address.port_tracing_representation(),
86 "Starting server monitoring"
87 )
88 }
89 }
90
91 fn handle_server_closed_event(&self, event: ServerClosedEvent) {
92 if trace_or_log_enabled!(
93 target: TOPOLOGY_TRACING_EVENT_TARGET,
94 TracingOrLogLevel::Debug
95 ) {
96 tracing::debug!(
97 target: TOPOLOGY_TRACING_EVENT_TARGET,
98 topologyId = self.topology_id.tracing_representation(),
99 serverHost = event.address.host().as_ref(),
100 serverPort = event.address.port_tracing_representation(),
101 "Stopped server monitoring"
102 )
103 }
104 }
105
106 fn handle_topology_description_changed_event(&self, event: TopologyDescriptionChangedEvent) {
107 if trace_or_log_enabled!(
108 target: TOPOLOGY_TRACING_EVENT_TARGET,
109 TracingOrLogLevel::Debug
110 ) {
111 tracing::debug!(
112 target: TOPOLOGY_TRACING_EVENT_TARGET,
113 topologyId = self.topology_id.tracing_representation(),
114 previousDescription = event.previous_description.tracing_representation(),
115 newDescription = event.new_description.tracing_representation(),
116 "Topology description changed"
117 )
118 }
119 }
120
121 fn handle_topology_opening_event(&self, _event: TopologyOpeningEvent) {
122 if trace_or_log_enabled!(
123 target: TOPOLOGY_TRACING_EVENT_TARGET,
124 TracingOrLogLevel::Debug
125 ) {
126 tracing::debug!(
127 target: TOPOLOGY_TRACING_EVENT_TARGET,
128 topologyId = self.topology_id.tracing_representation(),
129 "Starting topology monitoring"
130 )
131 }
132 }
133
134 fn handle_topology_closed_event(&self, _event: TopologyClosedEvent) {
135 if trace_or_log_enabled!(
136 target: TOPOLOGY_TRACING_EVENT_TARGET,
137 TracingOrLogLevel::Debug
138 ) {
139 tracing::debug!(
140 target: TOPOLOGY_TRACING_EVENT_TARGET,
141 topologyId = self.topology_id.tracing_representation(),
142 "Stopped topology monitoring"
143 )
144 }
145 }
146
147 fn handle_server_heartbeat_started_event(&self, event: ServerHeartbeatStartedEvent) {
148 if trace_or_log_enabled!(
149 target: TOPOLOGY_TRACING_EVENT_TARGET,
150 TracingOrLogLevel::Debug
151 ) {
152 tracing::debug!(
153 target: TOPOLOGY_TRACING_EVENT_TARGET,
154 topologyId = self.topology_id.tracing_representation(),
155 serverHost = event.server_address.host().as_ref(),
156 serverPort = event.server_address.port_tracing_representation(),
157 driverConnectionId = event.driver_connection_id,
158 serverConnectionId = event.server_connection_id,
159 awaited = event.awaited,
160 "Server heartbeat started"
161 )
162 }
163 }
164
165 fn handle_server_heartbeat_succeeded_event(&self, event: ServerHeartbeatSucceededEvent) {
166 if trace_or_log_enabled!(
167 target: TOPOLOGY_TRACING_EVENT_TARGET,
168 TracingOrLogLevel::Debug
169 ) {
170 tracing::debug!(
171 target: TOPOLOGY_TRACING_EVENT_TARGET,
172 topologyId = self.topology_id.tracing_representation(),
173 serverHost = event.server_address.host().as_ref(),
174 serverPort = event.server_address.port_tracing_representation(),
175 driverConnectionId = event.driver_connection_id,
176 serverConnectionId = event.server_connection_id,
177 awaited = event.awaited,
178 reply = doc_to_json_str(event.reply, self.max_document_length_bytes),
179 durationMS = event.duration.as_millis(),
180 "Server heartbeat succeeded"
181 )
182 }
183 }
184
185 fn handle_server_heartbeat_failed_event(&self, event: ServerHeartbeatFailedEvent) {
186 if trace_or_log_enabled!(
187 target: TOPOLOGY_TRACING_EVENT_TARGET,
188 TracingOrLogLevel::Debug
189 ) {
190 tracing::debug!(
191 target: TOPOLOGY_TRACING_EVENT_TARGET,
192 topologyId = self.topology_id.tracing_representation(),
193 serverHost = event.server_address.host().as_ref(),
194 serverPort = event.server_address.port_tracing_representation(),
195 driverConnectionId = event.driver_connection_id,
196 serverConnectionId = event.server_connection_id,
197 awaited = event.awaited,
198 failure = event.failure.tracing_representation(self.max_document_length_bytes),
199 durationMS = event.duration.as_millis(),
200 "Server heartbeat failed"
201 )
202 }
203 }
204}