Skip to main content

sozu_client/
channel.rs

1//! # Channel module
2//!
3//! This module provides a connection manager over Sōzu channel
4
5use std::path::PathBuf;
6
7use bb8::ManageConnection;
8use sozu_command_lib::{
9    channel::{Channel, ChannelError},
10    config::Config,
11    proto::command::{Request, Response},
12};
13use tracing::{debug, error};
14
15use crate::socket;
16
17// -----------------------------------------------------------------------------
18// Error
19
20#[derive(thiserror::Error, Debug)]
21pub enum Error {
22    #[error("failed to load configuration, {0}")]
23    Load(crate::config::Error),
24    #[error("failed to connect to socket, {0}")]
25    Connect(socket::Error),
26    #[error("socket is unhealthy, {0}")]
27    SocketError(std::io::Error),
28    #[error("failed to set blocking the socket, {0}")]
29    Blocking(ChannelError),
30}
31
32// -----------------------------------------------------------------------------
33// ConnectionManagerOpts
34
35#[derive(PartialEq, Eq, Clone, Debug)]
36pub struct ConnectionProperties {
37    pub socket: PathBuf,
38    pub buffer_size: u64,
39    pub max_buffer_size: u64,
40}
41
42impl From<&Config> for ConnectionProperties {
43    #[tracing::instrument(skip_all)]
44    fn from(config: &Config) -> Self {
45        Self {
46            socket: PathBuf::from(&config.command_socket),
47            buffer_size: config.command_buffer_size,
48            max_buffer_size: config.max_command_buffer_size,
49        }
50    }
51}
52
53impl TryFrom<&PathBuf> for ConnectionProperties {
54    type Error = Error;
55
56    #[tracing::instrument]
57    fn try_from(path: &PathBuf) -> Result<Self, Self::Error> {
58        let config = crate::config::try_from(path).map_err(Error::Load)?;
59
60        Ok(Self::from(&config))
61    }
62}
63
64impl ConnectionProperties {
65    #[tracing::instrument]
66    fn new(socket: PathBuf, buffer_size: u64, max_buffer_size: u64) -> Self {
67        Self {
68            socket,
69            buffer_size,
70            max_buffer_size,
71        }
72    }
73}
74
75// -----------------------------------------------------------------------------
76// ConnectionManager
77
78#[derive(Clone, Debug)]
79pub struct ConnectionManager {
80    opts: ConnectionProperties,
81}
82
83impl ManageConnection for ConnectionManager {
84    type Connection = Channel<Request, Response>;
85    type Error = Error;
86
87    #[tracing::instrument(skip_all)]
88    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
89        debug!(
90            path = self.opts.socket.display().to_string(),
91            "Connect to Sōzu' socket"
92        );
93        let sock = socket::connect(&self.opts.socket)
94            .await
95            .map_err(Error::Connect)?;
96
97        let mut channel = Channel::new(sock, self.opts.buffer_size, self.opts.max_buffer_size);
98
99        channel.blocking().map_err(Error::Blocking)?;
100
101        Ok(channel)
102    }
103
104    #[tracing::instrument(skip_all)]
105    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
106        // We do not perform a ping-pong message as we know that the unix socket
107        // is working if it is present.
108        //
109        // We only check that the socket behind the connection has no error.
110        match conn.sock.take_error() {
111            Ok(Some(err)) | Err(err) => {
112                error!(error = err.to_string(), "connexion to sōzu has errors");
113                Err(Error::SocketError(err))
114            }
115            Ok(None) => Ok(()),
116        }
117    }
118
119    #[tracing::instrument(skip_all)]
120    fn has_broken(&self, conn: &mut Self::Connection) -> bool {
121        // Check if the connection could be reused by the connection pool.
122        //
123        // We only check that the socket behind the connection has no error.
124        match conn.sock.take_error() {
125            Ok(Some(err)) | Err(err) => {
126                error!(error = err.to_string(), "connexion to sōzu has errors");
127                true
128            }
129            Ok(None) => false,
130        }
131    }
132}
133
134impl ConnectionManager {
135    #[tracing::instrument(skip_all)]
136    pub fn new(opts: ConnectionProperties) -> Self {
137        Self { opts }
138    }
139}