zebra_network/peer/client.rs
1//! Handles outbound requests from our node to the network.
2
3use std::{
4 collections::HashSet,
5 future::Future,
6 iter,
7 pin::Pin,
8 sync::Arc,
9 task::{Context, Poll},
10};
11
12use futures::{
13 channel::{mpsc, oneshot},
14 future, ready,
15 stream::{Stream, StreamExt},
16 FutureExt,
17};
18use tokio::{sync::broadcast, task::JoinHandle};
19use tower::Service;
20
21use zebra_chain::diagnostic::task::CheckForPanics;
22
23use crate::{
24 peer::{
25 error::{AlreadyErrored, ErrorSlot, PeerError, SharedPeerError},
26 ConnectionInfo,
27 },
28 peer_set::InventoryChange,
29 protocol::{
30 external::InventoryHash,
31 internal::{Request, Response},
32 },
33 BoxError, PeerSocketAddr,
34};
35
36#[cfg(any(test, feature = "proptest-impl"))]
37pub mod tests;
38
39/// The "client" duplex half of a peer connection.
40pub struct Client {
41 /// The metadata for the connected peer `service`.
42 pub connection_info: Arc<ConnectionInfo>,
43
44 /// Used to shut down the corresponding heartbeat.
45 /// This is always Some except when we take it on drop.
46 pub(crate) shutdown_tx: Option<oneshot::Sender<CancelHeartbeatTask>>,
47
48 /// Used to send [`Request`]s to the remote peer.
49 pub(crate) server_tx: mpsc::Sender<ClientRequest>,
50
51 /// Used to register missing inventory in client [`Response`]s,
52 /// so that the peer set can route retries to other clients.
53 pub(crate) inv_collector: broadcast::Sender<InventoryChange>,
54
55 /// A slot for an error shared between the Connection and the Client that uses it.
56 ///
57 /// `None` unless the connection or client have errored.
58 pub(crate) error_slot: ErrorSlot,
59
60 /// A handle to the task responsible for connecting to the peer.
61 pub(crate) connection_task: JoinHandle<()>,
62
63 /// A handle to the task responsible for sending periodic heartbeats.
64 pub(crate) heartbeat_task: JoinHandle<Result<(), BoxError>>,
65}
66
67/// A signal sent by the [`Client`] half of a peer connection,
68/// to cancel a [`Client`]'s heartbeat task.
69///
70/// When it receives this signal, the heartbeat task exits.
71#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
72pub struct CancelHeartbeatTask;
73
74/// A message from the `peer::Client` to the `peer::Server`.
75#[derive(Debug)]
76pub(crate) struct ClientRequest {
77 /// The actual network request for the peer.
78 pub request: Request,
79
80 /// The response `Message` channel, included because `peer::Client::call`
81 /// returns a future that may be moved around before it resolves.
82 pub tx: oneshot::Sender<Result<Response, SharedPeerError>>,
83
84 /// Used to register missing inventory in responses on `tx`,
85 /// so that the peer set can route retries to other clients.
86 pub inv_collector: Option<broadcast::Sender<InventoryChange>>,
87
88 /// The peer address for registering missing inventory.
89 ///
90 /// TODO: replace this with `ConnectedAddr`?
91 pub transient_addr: Option<PeerSocketAddr>,
92
93 /// The tracing context for the request, so that work the connection task does
94 /// processing messages in the context of this request will have correct context.
95 pub span: tracing::Span,
96}
97
98/// A receiver for the `peer::Server`, which wraps a `mpsc::Receiver`,
99/// converting `ClientRequest`s into `InProgressClientRequest`s.
100#[derive(Debug)]
101pub(super) struct ClientRequestReceiver {
102 /// The inner receiver
103 inner: mpsc::Receiver<ClientRequest>,
104}
105
106/// A message from the `peer::Client` to the `peer::Server`,
107/// after it has been received by the `peer::Server`.
108#[derive(Debug)]
109#[must_use = "tx.send() must be called before drop"]
110pub(super) struct InProgressClientRequest {
111 /// The actual request.
112 pub request: Request,
113
114 /// The return message channel, included because `peer::Client::call` returns a
115 /// future that may be moved around before it resolves.
116 ///
117 /// INVARIANT: `tx.send()` must be called before dropping `tx`.
118 ///
119 /// JUSTIFICATION: the `peer::Client` translates `Request`s into
120 /// `ClientRequest`s, which it sends to a background task. If the send is
121 /// `Ok(())`, it will assume that it is safe to unconditionally poll the
122 /// `Receiver` tied to the `Sender` used to create the `ClientRequest`.
123 ///
124 /// We also take advantage of this invariant to route inventory requests
125 /// away from peers that did not respond with that inventory.
126 ///
127 /// We enforce this invariant via the type system, by converting
128 /// `ClientRequest`s to `InProgressClientRequest`s when they are received by
129 /// the background task. These conversions are implemented by
130 /// `ClientRequestReceiver`.
131 pub tx: MustUseClientResponseSender,
132
133 /// The tracing context for the request, so that work the connection task does
134 /// processing messages in the context of this request will have correct context.
135 pub span: tracing::Span,
136}
137
138/// A `oneshot::Sender` for client responses, that must be used by calling `send()`.
139/// Also handles forwarding missing inventory to the inventory registry.
140///
141/// Panics on drop if `tx` has not been used or canceled.
142/// Panics if `tx.send()` is used more than once.
143#[derive(Debug)]
144#[must_use = "tx.send() must be called before drop"]
145pub(super) struct MustUseClientResponseSender {
146 /// The sender for the oneshot client response channel.
147 ///
148 /// `None` if `tx.send()` has been used.
149 pub tx: Option<oneshot::Sender<Result<Response, SharedPeerError>>>,
150
151 /// Forwards missing inventory in the response to the inventory collector.
152 ///
153 /// Boxed to reduce the size of containing structures.
154 pub missing_inv: Option<Box<MissingInventoryCollector>>,
155}
156
157/// Forwards missing inventory in the response to the inventory registry.
158#[derive(Debug)]
159pub(super) struct MissingInventoryCollector {
160 /// A clone of the original request, if it is an inventory request.
161 ///
162 /// This struct is only ever created with inventory requests.
163 request: Request,
164
165 /// Used to register missing inventory from responses,
166 /// so that the peer set can route retries to other clients.
167 collector: broadcast::Sender<InventoryChange>,
168
169 /// The peer address for registering missing inventory.
170 transient_addr: PeerSocketAddr,
171}
172
173impl std::fmt::Debug for Client {
174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 // skip the channels, they don't tell us anything useful
176 f.debug_struct("Client")
177 .field("connection_info", &self.connection_info)
178 .field("error_slot", &self.error_slot)
179 .field("connection_task", &self.connection_task)
180 .field("heartbeat_task", &self.heartbeat_task)
181 .finish()
182 }
183}
184
185impl From<ClientRequest> for InProgressClientRequest {
186 fn from(client_request: ClientRequest) -> Self {
187 let ClientRequest {
188 request,
189 tx,
190 inv_collector,
191 transient_addr,
192 span,
193 } = client_request;
194
195 let tx = MustUseClientResponseSender::new(tx, &request, inv_collector, transient_addr);
196
197 InProgressClientRequest { request, tx, span }
198 }
199}
200
201impl ClientRequestReceiver {
202 /// Forwards to `inner.close()`.
203 pub fn close(&mut self) {
204 self.inner.close()
205 }
206
207 /// Closes `inner`, then gets the next pending [`Request`].
208 ///
209 /// Closing the channel ensures that:
210 /// - the request stream terminates, and
211 /// - task notifications are not required.
212 pub fn close_and_flush_next(&mut self) -> Option<InProgressClientRequest> {
213 self.inner.close();
214
215 // # Correctness
216 //
217 // The request stream terminates, because the sender is closed,
218 // and the channel has a limited capacity.
219 // Task notifications are not required, because the sender is closed.
220 //
221 // Despite what its documentation says, we've seen futures::channel::mpsc::Receiver::try_recv()
222 // return an error after the channel is closed.
223 self.inner.try_recv().ok().map(Into::into)
224 }
225}
226
227impl Stream for ClientRequestReceiver {
228 type Item = InProgressClientRequest;
229
230 /// Converts the successful result of `inner.poll_next()` to an
231 /// `InProgressClientRequest`.
232 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
233 match self.inner.poll_next_unpin(cx) {
234 Poll::Ready(client_request) => Poll::Ready(client_request.map(Into::into)),
235 // CORRECTNESS
236 //
237 // The current task must be scheduled for wakeup every time we
238 // return `Poll::Pending`.
239 //
240 // inner.poll_next_unpin` schedules this task for wakeup when
241 // there are new items available in the inner stream.
242 Poll::Pending => Poll::Pending,
243 }
244 }
245
246 /// Returns `inner.size_hint()`
247 fn size_hint(&self) -> (usize, Option<usize>) {
248 self.inner.size_hint()
249 }
250}
251
252impl From<mpsc::Receiver<ClientRequest>> for ClientRequestReceiver {
253 fn from(rx: mpsc::Receiver<ClientRequest>) -> Self {
254 ClientRequestReceiver { inner: rx }
255 }
256}
257
258impl MustUseClientResponseSender {
259 /// Returns a newly created client response sender for `tx`.
260 ///
261 /// If `request` or the response contains missing inventory,
262 /// it is forwarded to the `inv_collector`, for the peer at `transient_addr`.
263 pub fn new(
264 tx: oneshot::Sender<Result<Response, SharedPeerError>>,
265 request: &Request,
266 inv_collector: Option<broadcast::Sender<InventoryChange>>,
267 transient_addr: Option<PeerSocketAddr>,
268 ) -> Self {
269 Self {
270 tx: Some(tx),
271 missing_inv: MissingInventoryCollector::new(request, inv_collector, transient_addr),
272 }
273 }
274
275 /// Forwards `response` to `tx.send()`, and missing inventory to `inv_collector`,
276 /// and marks this sender as used.
277 ///
278 /// Panics if `tx.send()` is used more than once.
279 pub fn send(
280 mut self,
281 response: Result<Response, SharedPeerError>,
282 ) -> Result<(), Result<Response, SharedPeerError>> {
283 // Forward any missing inventory to the registry.
284 if let Some(missing_inv) = self.missing_inv.take() {
285 missing_inv.send(&response);
286 }
287
288 // Forward the response to the internal requester.
289 self.tx
290 .take()
291 .unwrap_or_else(|| {
292 panic!(
293 "multiple uses of response sender: response must be sent exactly once: {self:?}"
294 )
295 })
296 .send(response)
297 }
298
299 /// Returns `tx.cancellation()`.
300 ///
301 /// Panics if `tx.send()` has previously been used.
302 pub fn cancellation(&mut self) -> oneshot::Cancellation<'_, Result<Response, SharedPeerError>> {
303 self.tx
304 .as_mut()
305 .map(|tx| tx.cancellation())
306 .unwrap_or_else( || {
307 panic!("called cancellation() after using oneshot sender: oneshot must be used exactly once")
308 })
309 }
310
311 /// Returns `tx.is_canceled()`.
312 ///
313 /// Panics if `tx.send()` has previously been used.
314 pub fn is_canceled(&self) -> bool {
315 self.tx
316 .as_ref()
317 .map(|tx| tx.is_canceled())
318 .unwrap_or_else(
319 || panic!("called is_canceled() after using oneshot sender: oneshot must be used exactly once: {self:?}"))
320 }
321}
322
323impl Drop for MustUseClientResponseSender {
324 #[instrument(skip(self))]
325 fn drop(&mut self) {
326 // we don't panic if we are shutting down anyway
327 if !zebra_chain::shutdown::is_shutting_down() {
328 // is_canceled() will not panic, because we check is_none() first
329 assert!(
330 self.tx.is_none() || self.is_canceled(),
331 "unused client response sender: oneshot must be used or canceled: {self:?}"
332 );
333 }
334 }
335}
336
337impl MissingInventoryCollector {
338 /// Returns a newly created missing inventory collector, if needed.
339 ///
340 /// If `request` or the response contains missing inventory,
341 /// it is forwarded to the `inv_collector`, for the peer at `transient_addr`.
342 pub fn new(
343 request: &Request,
344 inv_collector: Option<broadcast::Sender<InventoryChange>>,
345 transient_addr: Option<PeerSocketAddr>,
346 ) -> Option<Box<MissingInventoryCollector>> {
347 if !request.is_inventory_download() {
348 return None;
349 }
350
351 if let (Some(inv_collector), Some(transient_addr)) = (inv_collector, transient_addr) {
352 Some(Box::new(MissingInventoryCollector {
353 request: request.clone(),
354 collector: inv_collector,
355 transient_addr,
356 }))
357 } else {
358 None
359 }
360 }
361
362 /// Forwards explicitly observed missing inventory to the registry.
363 ///
364 /// Only explicit `notfound` responses update the registry. Transport
365 /// failures and timeouts do not prove the peer lacks the inventory,
366 /// so they are not forwarded (prevents registry poisoning under
367 /// high sync concurrency).
368 pub fn send(self, response: &Result<Response, SharedPeerError>) {
369 let missing_inv: HashSet<InventoryHash> = match (self.request, response) {
370 // Missing block hashes from partial responses.
371 (_, Ok(Response::Blocks(block_statuses))) => block_statuses
372 .iter()
373 .filter_map(|b| b.missing())
374 .map(InventoryHash::Block)
375 .collect(),
376
377 // Missing transaction IDs from partial responses.
378 (_, Ok(Response::Transactions(tx_statuses))) => tx_statuses
379 .iter()
380 .filter_map(|tx| tx.missing())
381 .map(|tx| tx.into())
382 .collect(),
383
384 // Other response types never contain missing inventory.
385 (_, Ok(_)) => iter::empty().collect(),
386
387 // Registry-generated errors are already tracked — don't re-forward.
388 (_, Err(e)) if e.inner_debug().contains("NotFoundRegistry") => iter::empty().collect(),
389
390 // Only mark inventory as missing for explicit notfound responses.
391 // Transport failures (timeouts, dropped connections) do not prove
392 // the peer lacks the inventory — they only prove the request failed.
393 // Treating them as missing poisons the routing registry under high
394 // sync concurrency.
395 (ref request, Err(e)) if e.inner_debug().contains("NotFoundResponse") => request
396 .block_hash_inventory()
397 .into_iter()
398 .map(InventoryHash::Block)
399 .chain(
400 request
401 .transaction_id_inventory()
402 .into_iter()
403 .map(InventoryHash::from),
404 )
405 .collect(),
406
407 // All other errors (timeouts, drops, overload): don't poison.
408 (_, Err(_)) => iter::empty().collect(),
409 };
410
411 if let Some(missing_inv) =
412 InventoryChange::new_missing_multi(missing_inv.iter(), self.transient_addr)
413 {
414 // if all the receivers are closed, assume we're in tests or an isolated connection
415 let _ = self.collector.send(missing_inv);
416 }
417 }
418}
419
420impl Client {
421 /// Check if this connection's heartbeat task has exited.
422 ///
423 /// Returns an error if the heartbeat task exited. Otherwise, schedules the client task for
424 /// wakeup when the heartbeat task finishes, or the channel closes, and returns `Pending`.
425 ///
426 /// # Panics
427 ///
428 /// If the heartbeat task panicked.
429 #[allow(clippy::unwrap_in_result)]
430 fn poll_heartbeat(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SharedPeerError>> {
431 let is_canceled = self
432 .shutdown_tx
433 .as_mut()
434 .expect("only taken on drop")
435 .poll_canceled(cx)
436 .is_ready();
437
438 let result = match self.heartbeat_task.poll_unpin(cx) {
439 Poll::Pending => {
440 // The heartbeat task returns `Pending` while it continues to run.
441 // But if it has dropped its receiver, it is shutting down, and we should also shut down.
442 if is_canceled {
443 self.set_task_exited_error(
444 "heartbeat",
445 PeerError::HeartbeatTaskExited("Task was cancelled".to_string()),
446 )
447 } else {
448 // Heartbeat task is still running.
449 return Poll::Pending;
450 }
451 }
452 Poll::Ready(Ok(Ok(_))) => {
453 // Heartbeat task stopped unexpectedly, without panic or error.
454 self.set_task_exited_error(
455 "heartbeat",
456 PeerError::HeartbeatTaskExited(
457 "Heartbeat task stopped unexpectedly".to_string(),
458 ),
459 )
460 }
461 Poll::Ready(Ok(Err(error))) => {
462 // Heartbeat task stopped unexpectedly, with error.
463 self.set_task_exited_error(
464 "heartbeat",
465 PeerError::HeartbeatTaskExited(error.to_string()),
466 )
467 }
468 Poll::Ready(Err(error)) => {
469 // Heartbeat task panicked.
470 let error = error.panic_if_task_has_panicked();
471
472 // Heartbeat task was cancelled.
473 if error.is_cancelled() {
474 self.set_task_exited_error(
475 "heartbeat",
476 PeerError::HeartbeatTaskExited("Task was cancelled".to_string()),
477 )
478 }
479 // Heartbeat task stopped with another kind of task error.
480 else {
481 self.set_task_exited_error(
482 "heartbeat",
483 PeerError::HeartbeatTaskExited(error.to_string()),
484 )
485 }
486 }
487 };
488
489 Poll::Ready(result)
490 }
491
492 /// Check if the connection's request/response task has exited.
493 ///
494 /// Returns an error if the connection task exited. Otherwise, schedules the client task for
495 /// wakeup when the connection task finishes, and returns `Pending`.
496 ///
497 /// # Panics
498 ///
499 /// If the connection task panicked.
500 fn poll_connection(&mut self, context: &mut Context<'_>) -> Poll<Result<(), SharedPeerError>> {
501 // Return `Pending` if the connection task is still running.
502 let result = match ready!(self.connection_task.poll_unpin(context)) {
503 Ok(()) => {
504 // Connection task stopped unexpectedly, without panicking.
505 self.set_task_exited_error("connection", PeerError::ConnectionTaskExited)
506 }
507 Err(error) => {
508 // Connection task panicked or was cancelled.
509 let _ = error.panic_if_task_has_panicked();
510
511 // Any unexpected termination of the connection task is reported as ConnectionTaskExited.
512 self.set_task_exited_error("connection", PeerError::ConnectionTaskExited)
513 }
514 };
515
516 Poll::Ready(result)
517 }
518
519 /// Properly update the error slot after a background task has unexpectedly stopped.
520 fn set_task_exited_error(
521 &mut self,
522 task_name: &str,
523 error: PeerError,
524 ) -> Result<(), SharedPeerError> {
525 // Make sure there is an error in the slot
526 let task_error = SharedPeerError::from(error);
527 let original_error = self.error_slot.try_update_error(task_error.clone());
528 debug!(
529 ?original_error,
530 latest_error = ?task_error,
531 "client {} task exited", task_name
532 );
533
534 if let Err(AlreadyErrored { original_error }) = original_error {
535 Err(original_error)
536 } else {
537 Err(task_error)
538 }
539 }
540
541 /// Poll for space in the shared request sender channel.
542 ///
543 /// Returns an error if the sender channel is closed. If there is no space in the channel,
544 /// returns `Pending`, and schedules the task for wakeup when there is space available.
545 fn poll_request(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SharedPeerError>> {
546 let server_result = ready!(self.server_tx.poll_ready(cx));
547 if server_result.is_err() {
548 Poll::Ready(Err(self
549 .error_slot
550 .try_get_error()
551 .unwrap_or_else(|| PeerError::ConnectionTaskExited.into())))
552 } else if let Some(error) = self.error_slot.try_get_error() {
553 Poll::Ready(Err(error))
554 } else {
555 Poll::Ready(Ok(()))
556 }
557 }
558
559 /// Poll for space in the shared request sender channel, and for errors in the connection tasks.
560 ///
561 /// Returns an error if the sender channel is closed, or the heartbeat or connection tasks have
562 /// terminated. If there is no space in the channel, returns `Pending`, and schedules the task
563 /// for wakeup when there is space available, or one of the tasks terminates.
564 fn poll_client(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SharedPeerError>> {
565 // # Correctness
566 //
567 // The current task must be scheduled for wakeup every time we return
568 // `Poll::Pending`.
569 //
570 // `poll_heartbeat()` and `poll_connection()` schedule the client task for wakeup
571 // if either task exits, or if the heartbeat task drops the cancel handle.
572 //
573 //`ready!` returns `Poll::Pending` when `server_tx` is unready, and
574 // schedules this task for wakeup.
575 //
576 // It's ok to exit early and skip wakeups when there is an error, because the connection
577 // and its tasks are shut down immediately on error.
578
579 let _heartbeat_pending: Poll<()> = self.poll_heartbeat(cx)?;
580 let _connection_pending: Poll<()> = self.poll_connection(cx)?;
581
582 // We're only pending if the sender channel is full.
583 self.poll_request(cx)
584 }
585
586 /// Shut down the resources held by the client half of this peer connection.
587 ///
588 /// Stops further requests to the remote peer, and stops the heartbeat task.
589 fn shutdown(&mut self) {
590 // Prevent any senders from sending more messages to this peer.
591 self.server_tx.close_channel();
592
593 // Ask the heartbeat task to stop.
594 if let Some(shutdown_tx) = self.shutdown_tx.take() {
595 let _ = shutdown_tx.send(CancelHeartbeatTask);
596 }
597
598 // Force the connection and heartbeat tasks to stop.
599 self.connection_task.abort();
600 self.heartbeat_task.abort();
601 }
602}
603
604impl Service<Request> for Client {
605 type Response = Response;
606 type Error = SharedPeerError;
607 type Future =
608 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
609
610 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
611 // # Correctness
612 //
613 // The current task must be scheduled for wakeup every time we return
614 // `Poll::Pending`.
615 //
616 // `poll_client()` schedules the client task for wakeup if the sender channel has space,
617 // either connection task exits, or if the heartbeat task drops the cancel handle.
618
619 // Check all the tasks and channels.
620 //
621 //`ready!` returns `Poll::Pending` when `server_tx` is unready, and
622 // schedules this task for wakeup.
623 let result = ready!(self.poll_client(cx));
624
625 // Shut down the client and connection if there is an error.
626 if let Err(error) = result {
627 self.shutdown();
628
629 Poll::Ready(Err(error))
630 } else {
631 Poll::Ready(Ok(()))
632 }
633 }
634
635 fn call(&mut self, request: Request) -> Self::Future {
636 let (tx, rx) = oneshot::channel();
637 // get the current Span to propagate it to the peer connection task.
638 // this allows the peer connection to enter the correct tracing context
639 // when it's handling messages in the context of processing this
640 // request.
641 let span = tracing::Span::current();
642
643 match self.server_tx.try_send(ClientRequest {
644 request,
645 tx,
646 inv_collector: Some(self.inv_collector.clone()),
647 transient_addr: self.connection_info.connected_addr.get_transient_addr(),
648 span,
649 }) {
650 Err(e) => {
651 if e.is_disconnected() {
652 let peer_error = self
653 .error_slot
654 .try_get_error()
655 .unwrap_or_else(|| PeerError::ConnectionTaskExited.into());
656
657 let ClientRequest { tx, .. } = e.into_inner();
658 let _ = tx.send(Err(peer_error.clone()));
659
660 future::ready(Err(peer_error)).boxed()
661 } else {
662 // sending fails when there's not enough
663 // channel space, but we called poll_ready
664 panic!("called call without poll_ready");
665 }
666 }
667 Ok(()) => {
668 // The receiver end of the oneshot is itself a future.
669 rx.map(|oneshot_recv_result| {
670 // The ClientRequest oneshot sender should not be dropped before sending a
671 // response. But sometimes that happens during process or connection shutdown.
672 // So we just return a generic error here.
673 match oneshot_recv_result {
674 Ok(result) => result,
675 Err(oneshot::Canceled) => Err(PeerError::ConnectionDropped.into()),
676 }
677 })
678 .boxed()
679 }
680 }
681 }
682}
683
684impl Drop for Client {
685 fn drop(&mut self) {
686 // Make sure there is an error in the slot
687 let drop_error: SharedPeerError = PeerError::ClientDropped.into();
688 let original_error = self.error_slot.try_update_error(drop_error.clone());
689 debug!(
690 ?original_error,
691 latest_error = ?drop_error,
692 "client struct dropped"
693 );
694
695 self.shutdown();
696 }
697}