sozu_client/
channel.rs

1//! # Channel module
2//!
3//! This module provides a connection manager over Sōzu channel
4
5use std::path::PathBuf;
6
7use bb8::ManageConnection;
8use sozu_command_lib::{
9    channel::{Channel, ChannelError},
10    config::Config,
11    proto::command::{Request, Response},
12};
13use tracing::{debug, error};
14
15use crate::socket;
16
17// -----------------------------------------------------------------------------
18// Error
19
20#[derive(thiserror::Error, Debug)]
21pub enum Error {
22    #[error("failed to load configuration, {0}")]
23    Load(crate::config::Error),
24    #[error("failed to connect to socket, {0}")]
25    Connect(socket::Error),
26    #[error("socket is unhealthy, {0}")]
27    SocketError(std::io::Error),
28    #[error("failed to set blocking the socket, {0}")]
29    Blocking(ChannelError),
30}
31
32// -----------------------------------------------------------------------------
33// ConnectionManagerOpts
34
35#[derive(PartialEq, Eq, Clone, Debug)]
36pub struct ConnectionProperties {
37    pub socket: PathBuf,
38    pub buffer_size: u64,
39    pub max_buffer_size: u64,
40}
41
42impl From<&Config> for ConnectionProperties {
43    #[tracing::instrument(skip_all)]
44    fn from(config: &Config) -> Self {
45        Self {
46            socket: PathBuf::from(&config.command_socket),
47            buffer_size: config.command_buffer_size,
48            max_buffer_size: config.max_command_buffer_size,
49        }
50    }
51}
52
53impl TryFrom<&PathBuf> for ConnectionProperties {
54    type Error = Error;
55
56    #[tracing::instrument]
57    fn try_from(path: &PathBuf) -> Result<Self, Self::Error> {
58        let config = crate::config::try_from(path).map_err(Error::Load)?;
59
60        Ok(Self::from(&config))
61    }
62}
63
64impl ConnectionProperties {
65    #[tracing::instrument]
66    fn new(socket: PathBuf, buffer_size: u64, max_buffer_size: u64) -> Self {
67        Self {
68            socket,
69            buffer_size,
70            max_buffer_size,
71        }
72    }
73}
74
75// -----------------------------------------------------------------------------
76// ConnectionManager
77
78#[derive(Clone, Debug)]
79pub struct ConnectionManager {
80    opts: ConnectionProperties,
81}
82
83#[async_trait::async_trait]
84impl ManageConnection for ConnectionManager {
85    type Connection = Channel<Request, Response>;
86    type Error = Error;
87
88    #[tracing::instrument(skip_all)]
89    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
90        debug!(
91            path = self.opts.socket.display().to_string(),
92            "Connect to Sōzu' socket"
93        );
94        let sock = socket::connect(&self.opts.socket)
95            .await
96            .map_err(Error::Connect)?;
97
98        let mut channel = Channel::new(sock, self.opts.buffer_size, self.opts.max_buffer_size);
99
100        channel.blocking().map_err(Error::Blocking)?;
101
102        Ok(channel)
103    }
104
105    #[tracing::instrument(skip_all)]
106    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
107        // We do not perform a ping-pong message as we know that the unix socket
108        // is working if it is present.
109        //
110        // We only check that the socket behind the connection has no error.
111        match conn.sock.take_error() {
112            Ok(Some(err)) | Err(err) => {
113                error!(error = err.to_string(), "connexion to sōzu has errors");
114                Err(Error::SocketError(err))
115            }
116            Ok(None) => Ok(()),
117        }
118    }
119
120    #[tracing::instrument(skip_all)]
121    fn has_broken(&self, conn: &mut Self::Connection) -> bool {
122        // Check if the connection could be reused by the connection pool.
123        //
124        // We only check that the socket behind the connection has no error.
125        match conn.sock.take_error() {
126            Ok(Some(err)) | Err(err) => {
127                error!(error = err.to_string(), "connexion to sōzu has errors");
128                true
129            }
130            Ok(None) => false,
131        }
132    }
133}
134
135impl ConnectionManager {
136    #[tracing::instrument(skip_all)]
137    pub fn new(opts: ConnectionProperties) -> Self {
138        Self { opts }
139    }
140}