watchman_client 0.8.0

a client for the Watchman file watching service
Documentation
/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * This source code is licensed under the MIT license found in the
 * LICENSE file in the root directory of this source tree.
 */

//! This crate provides a client to the watchman file watching service.
//!
//! Start with the [Connector](struct.Connector.html) struct and use
//! it to connect and return a [Client](struct.Client.html) struct,
//! [Client::resolve_root](struct.Client.html#method.resolve_root) to
//! resolve a path and initiate a watch, and then
//! [Client::query](struct.Client.html#method.query) to perform
//! a query, or [Client::subscribe](struct.Client.html#method.subscribe)
//! to subscribe to file changes in real time.
//!
//! This example shows how to connect and expand a glob from the
//! current working directory:
//!
//! ```no_run
//! use watchman_client::prelude::*;
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//!   let mut client = Connector::new().connect().await?;
//!   let resolved = client
//!      .resolve_root(CanonicalPath::canonicalize(".")?)
//!      .await?;
//!
//!   // Basic globs -> names
//!   let files = client.glob(&resolved, &["**/*.rs"]).await?;
//!   println!("files: {:#?}", files);
//!   Ok(())
//! }
//! ```
#![deny(warnings)]

pub mod expr;
pub mod fields;
mod named_pipe;
pub mod pdu;
use bytes::{Bytes, BytesMut};
use futures::{future::FutureExt, stream::StreamExt};
use serde_bser::de::{Bunser, SliceRead};
use serde_bser::value::Value;
use std::collections::{HashMap, VecDeque};
use std::io::{self, Write};
use std::marker::PhantomData;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
#[cfg(unix)]
use tokio::net::UnixStream;
use tokio::process::Command;
use tokio::sync::mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender};
use tokio::sync::Mutex;
use tokio_util::codec::{Decoder, FramedRead};

/// The next id number to use when generating a subscription name
static SUB_ID: AtomicUsize = AtomicUsize::new(1);

/// `use watchman_client::prelude::*` for convenient access to the types
/// provided by this crate
pub mod prelude {
    pub use crate::expr::*;
    pub use crate::fields::*;
    pub use crate::pdu::*;
    pub use crate::query_result_type;
    pub use crate::{CanonicalPath, Client, Connector, ResolvedRoot};
}

use prelude::*;

#[derive(Error, Debug)]
pub enum ConnectionLost {
    #[error("Client task exited")]
    ClientTaskExited,

    #[error("Client task failed: {0}")]
    Error(String),
}

#[derive(Error, Debug)]
pub enum Error {
    #[error("Failed to connect to Watchman: {0}")]
    ConnectionError(tokio::io::Error),

    #[error("Lost connection to watchman")]
    ConnectionLost(#[from] ConnectionLost),

    #[error(
        "While invoking the {watchman_path} CLI to discover the server connection details: {reason}, stderr=`{stderr}`"
    )]
    ConnectionDiscovery {
        watchman_path: PathBuf,
        reason: String,
        stderr: String,
    },
    #[error("The watchman server reported an error: \"{}\", while executing command: {}", .message, .command)]
    WatchmanServerError { message: String, command: String },
    #[error("The watchman server reported an error: \"{}\"", .message)]
    WatchmanResponseError { message: String },
    #[error("The watchman server didn't return a value for field `{}` in response to a `{}` command. {:?}", .fieldname, .command, .response)]
    MissingField {
        fieldname: &'static str,
        command: String,
        response: String,
    },

    #[error("Deserialization error (data: {data:x?})")]
    Deserialize {
        data: Vec<u8>,
        #[source]
        source: anyhow::Error,
    },

    #[error("Seriaization error")]
    Serialize {
        #[source]
        source: anyhow::Error,
    },

    #[error("Failed to connect to {endpoint}")]
    Connect {
        endpoint: PathBuf,
        #[source]
        source: Box<std::io::Error>,
    },
}

#[derive(Error, Debug)]
enum TaskError {
    #[error("IO Error: {0}")]
    Io(#[from] std::io::Error),

    #[error("Task is shutting down")]
    Shutdown,

    #[error("EOF on Watchman socket")]
    Eof,

    #[error("Received a unilateral PDU from the server")]
    UnilateralPdu,

    #[error("Deserialization error (data: {data:x?})")]
    Deserialize {
        #[source]
        source: anyhow::Error,
        data: Vec<u8>,
    },
}

/// The Connector defines how to connect to the watchman server.
/// You will typically use `Connector::new` to set up the connection with
/// the environmental defaults.  You might want to override those defaults
/// in situations such as integration testing environments, or in extremely
/// latency sensitive environments where the cost of performing discovery
/// is a measurable overhead.
#[derive(Default)]
pub struct Connector {
    watchman_cli_path: Option<PathBuf>,
    unix_domain: Option<PathBuf>,
}

impl Connector {
    /// Set up the connector with the system defaults.
    /// If `WATCHMAN_SOCK` is set in the environment it will preset the
    /// local IPC socket path.
    /// Otherwise the connector will invoke the watchman CLI to perform
    /// discovery.
    pub fn new() -> Self {
        let connector = Self::default();

        if let Some(val) = std::env::var_os("WATCHMAN_SOCK") {
            connector.unix_domain_socket(val)
        } else {
            connector
        }
    }

    /// If the watchman CLI is installed in a location that is not present
    /// in the PATH environment variable, this method is used to inform
    /// the connector of its location.
    pub fn watchman_cli_path<P: AsRef<Path>>(mut self, path: P) -> Self {
        self.watchman_cli_path = Some(path.as_ref().to_path_buf());
        self
    }

    /// Specify the unix domain socket path
    pub fn unix_domain_socket<P: AsRef<Path>>(mut self, path: P) -> Self {
        self.unix_domain = Some(path.as_ref().to_path_buf());
        self
    }

    /// Resolve the unix domain socket path, taking either the override
    /// or performing discovery.
    async fn resolve_unix_domain_path(&self) -> Result<PathBuf, Error> {
        if let Some(path) = self.unix_domain.as_ref() {
            Ok(path.clone())
        } else {
            let watchman_path = self
                .watchman_cli_path
                .as_ref()
                .map(|p| p.as_ref())
                .unwrap_or_else(|| Path::new("watchman"));

            let output = Command::new(watchman_path)
                .args(&["--output-encoding", "bser-v2", "get-sockname"])
                .output()
                .await
                .map_err(|source| Error::ConnectionDiscovery {
                    watchman_path: watchman_path.to_path_buf(),
                    reason: source.to_string(),
                    stderr: "".to_string(),
                })?;

            let info: GetSockNameResponse =
                serde_bser::from_slice(&output.stdout).map_err(|source| {
                    Error::ConnectionDiscovery {
                        watchman_path: watchman_path.to_path_buf(),
                        reason: source.to_string(),
                        stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
                    }
                })?;

            let debug = format!("{:#?}", info);

            if let Some(message) = info.error {
                return Err(Error::WatchmanServerError {
                    message,
                    command: "get-sockname".into(),
                });
            }

            info.sockname.ok_or_else(|| Error::MissingField {
                fieldname: "sockname",
                command: "get-sockname".into(),
                response: debug,
            })
        }
    }

    /// Establish a connection to the watchman server.
    /// If the connector was configured to perform discovery (which is
    /// the default configuration), then this will attempt to start
    /// the watchman server.
    pub async fn connect(&self) -> Result<Client, Error> {
        let sock_path = self.resolve_unix_domain_path().await?;

        #[cfg(unix)]
        let stream = UnixStream::connect(sock_path)
            .await
            .map_err(Error::ConnectionError)?;

        #[cfg(windows)]
        let stream = named_pipe::NamedPipe::connect(sock_path).await?;

        let stream: Box<dyn ReadWriteStream> = Box::new(stream);

        let (reader, writer) = tokio::io::split(stream);

        let (request_tx, request_rx) = tokio::sync::mpsc::channel(128);

        let mut task = ClientTask {
            writer,
            reader: FramedRead::new(reader, BserSplitter),
            request_rx,
            request_queue: VecDeque::new(),
            waiting_response: false,
            subscriptions: HashMap::new(),
        };
        tokio::spawn(async move {
            if let Err(err) = task.run().await {
                let _ignored = writeln!(io::stderr(), "watchman client task failed: {}", err);
            }
        });

        let inner = Arc::new(Mutex::new(ClientInner { request_tx }));

        Ok(Client { inner })
    }
}

/// Represents a canonical path in the filesystem.
#[derive(Debug, Clone)]
pub struct CanonicalPath(PathBuf);

impl CanonicalPath {
    /// Construct the canonical version of the supplied path.
    /// This function will canonicalize the path and return the
    /// result, if successful.
    /// If you have already canonicalized the path, it is preferable
    /// to use the `with_canonicalized_path` function instead.
    pub fn canonicalize<P: AsRef<Path>>(path: P) -> Result<Self, std::io::Error> {
        let path = std::fs::canonicalize(path)?;
        Ok(Self(Self::strip_unc_escape(path)))
    }

    /// Construct from an already canonicalized path.
    /// This function will panic if the supplied path is not an absolute
    /// path!
    pub fn with_canonicalized_path(path: PathBuf) -> Self {
        assert!(
            path.is_absolute(),
            "attempted to call \
             CanonicalPath::with_canonicalized_path on a non-canonical path! \
             You probably want to call CanonicalPath::canonicalize instead!"
        );
        Self(Self::strip_unc_escape(path))
    }

    /// Watchman doesn't like the UNC prefix being present for incoming paths
    /// in its current implementation: we should fix that, but in the meantime
    /// we want clients to be able to connect to existing versions, so let's
    /// strip off the UNC escape
    #[cfg(windows)]
    #[inline]
    fn strip_unc_escape(path: PathBuf) -> PathBuf {
        match path.to_str() {
            Some(s) if s.starts_with("\\\\?\\") => PathBuf::from(&s[4..]),
            _ => path,
        }
    }

    #[cfg(unix)]
    #[inline]
    fn strip_unc_escape(path: PathBuf) -> PathBuf {
        path
    }
}

/// Data that describes a watched filesystem location.
/// Watchman performs watch aggregation to project boundaries, so a request
/// to watch a subdirectory will resolve to the higher level root path
/// and a relative path offset.
/// This struct encodes both pieces of information.
#[derive(Debug, Clone)]
pub struct ResolvedRoot {
    root: PathBuf,
    relative: Option<PathBuf>,
    watcher: String,
}

impl ResolvedRoot {
    /// Returns the name of the watcher that the server is using to
    /// monitor the path.  The watcher is generally system dependent,
    /// but some systems offer multipler watchers.
    /// You generally don't care too much about the watcher that is
    /// in use, but if the watcher is a virtualized filesystem such as
    /// `eden` then you may wish to use to alternative queries to get the
    /// best performance.
    pub fn watcher(&self) -> &str {
        self.watcher.as_str()
    }

    /// Returns the root of the watchman project that is being watched
    pub fn project_root(&self) -> &Path {
        &self.root
    }

    /// Returns the absolute path to the directory that you requested be resolved.
    pub fn path(&self) -> PathBuf {
        if let Some(relative) = self.relative.as_ref() {
            self.root.join(relative)
        } else {
            self.root.clone()
        }
    }

    /// Returns the path to the directory that you requested be resolved,
    /// relative to the `project_root`.
    pub fn project_relative_path(&self) -> Option<&Path> {
        self.relative.as_ref().map(PathBuf::as_ref)
    }
}

trait ReadWriteStream: AsyncRead + AsyncWrite + std::marker::Unpin + Send {}

#[cfg(unix)]
impl ReadWriteStream for UnixStream {}

struct SendRequest {
    /// The serialized request to send to the server
    buf: Vec<u8>,
    /// to pass the response back to the requstor
    tx: tokio::sync::oneshot::Sender<Result<Bytes, String>>,
}

impl SendRequest {
    fn respond(self, result: Result<Bytes, String>) {
        let _ = self.tx.send(result);
    }
}

enum SubscriptionNotification {
    Pdu(Bytes),
    Canceled,
}

enum TaskItem {
    QueueRequest(SendRequest),
    RegisterSubscription(String, UnboundedSender<SubscriptionNotification>),
}

/// Splits BSER mesages out of a stream. Does not attempt to actually decode them.
struct BserSplitter;

impl Decoder for BserSplitter {
    type Item = Bytes;
    type Error = TaskError;

    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        let mut bunser = Bunser::new(SliceRead::new(buf.as_ref()));

        let pdu = match bunser.read_pdu() {
            Ok(pdu) => pdu,
            Err(source) => {
                // We know that the smallest full PDU returned by the server won't ever be smaller
                // than this size. So, if we have less than BUF_SIZE bytes, ask for more data.
                const BUF_SIZE: usize = 16;

                let missing = BUF_SIZE.saturating_sub(buf.len());

                if missing > 0 {
                    buf.reserve(missing);
                    return Ok(None);
                }

                // We should have succeded in reading some data here, but we didn't. Return an
                // error.
                return Err(TaskError::Deserialize {
                    source: source.into(),
                    data: buf.to_vec(),
                });
            }
        };

        let total_size = (pdu.start + pdu.len) as usize;

        let missing = total_size.saturating_sub(buf.len());
        if missing > 0 {
            buf.reserve(missing);
            return Ok(None);
        }

        let ret = buf.split_to(total_size);
        Ok(Some(ret.freeze()))
    }
}

/// A live connection to a watchman server.
/// Use [Connector](struct.Connector.html) to establish a connection.
pub struct Client {
    inner: Arc<Mutex<ClientInner>>,
}

/// The client task coordinates sending requests with processing
/// unilateral results
struct ClientTask {
    writer: tokio::io::WriteHalf<Box<dyn ReadWriteStream>>,
    reader: FramedRead<tokio::io::ReadHalf<Box<dyn ReadWriteStream>>, BserSplitter>,
    request_rx: Receiver<TaskItem>,
    request_queue: VecDeque<SendRequest>,
    waiting_response: bool,
    subscriptions: HashMap<String, UnboundedSender<SubscriptionNotification>>,
}

impl Drop for ClientTask {
    fn drop(&mut self) {
        self.fail_all(&TaskError::Shutdown)
    }
}

impl ClientTask {
    async fn run(&mut self) -> Result<(), TaskError> {
        // process things, and if we encounter an error, ensure that
        // we fail all outstanding requests
        match self.run_loop().await {
            Err(err) => {
                self.fail_all(&err);
                Err(err)
            }
            ok => ok,
        }
    }

    async fn run_loop(&mut self) -> Result<(), TaskError> {
        loop {
            futures::select_biased! {
                pdu = self.reader.next().fuse() => {
                    match pdu {
                        Some(pdu) => self.process_pdu(pdu?).await?,
                        None => return Err(TaskError::Eof),
                    }
                }
                task = self.request_rx.recv().fuse() => {
                    match task {
                        Some(TaskItem::QueueRequest(request)) => self.queue_request(request).await?,
                        Some(TaskItem::RegisterSubscription(name, tx)) => {
                            self.register_subscription(name, tx)
                        }
                        None => break,
                    }
                }
            }
        }

        Ok(())
    }

    fn register_subscription(
        &mut self,
        name: String,
        tx: UnboundedSender<SubscriptionNotification>,
    ) {
        self.subscriptions.insert(name, tx);
    }

    /// Generate an error for each queued request.
    /// This is called in situations where the state of the connection
    /// to the serve is non-recoverable.
    fn fail_all(&mut self, err: &TaskError) {
        while let Some(request) = self.request_queue.pop_front() {
            request.respond(Err(err.to_string()));
        }
    }

    /// If we're not waiting for the response to a request,
    /// then send the next one!
    async fn send_next_request(&mut self) -> Result<(), TaskError> {
        if !self.waiting_response && !self.request_queue.is_empty() {
            match self
                .writer
                .write_all(&self.request_queue.front().expect("not empty").buf)
                .await
            {
                Err(err) => {
                    // A failed write breaks our world; we don't want to
                    // try to continue
                    return Err(err.into());
                }
                Ok(_) => self.waiting_response = true,
            }
        }
        Ok(())
    }

    /// Queue up a new request from the client code, and then
    /// check to see if we can send a queued request to the server.
    async fn queue_request(&mut self, request: SendRequest) -> Result<(), TaskError> {
        self.request_queue.push_back(request);
        self.send_next_request().await?;
        Ok(())
    }

    /// Dispatch a PDU that we just read to the appropriate client code.
    async fn process_pdu(&mut self, pdu: Bytes) -> Result<(), TaskError> {
        use serde::Deserialize;
        #[derive(Deserialize, Debug)]
        pub struct Unilateral {
            pub unilateral: bool,
            pub subscription: String,
            #[serde(default)]
            pub canceled: bool,
        }

        if let Ok(unilateral) = bunser::<Unilateral>(&pdu) {
            if let Some(subscription) = self.subscriptions.get_mut(&unilateral.subscription) {
                let msg = if unilateral.canceled {
                    SubscriptionNotification::Canceled
                } else {
                    SubscriptionNotification::Pdu(pdu)
                };

                if subscription.send(msg).is_err() || unilateral.canceled {
                    // The `Subscription` was dropped; we don't need to
                    // treat this as terminal for this client session,
                    // so just de-register the handler
                    self.subscriptions.remove(&unilateral.subscription);
                }
            }
        } else if self.waiting_response {
            let request = self
                .request_queue
                .pop_front()
                .expect("waiting_response is only true when request_queue is not empty");
            self.waiting_response = false;

            request.respond(Ok(pdu));
        } else {
            // This should never happen as we're not doing any subscription stuff
            return Err(TaskError::UnilateralPdu);
        }

        self.send_next_request().await?;
        Ok(())
    }
}

fn bunser<T>(buf: &[u8]) -> Result<T, Error>
where
    T: serde::de::DeserializeOwned,
{
    let response: T = serde_bser::from_slice(&buf).map_err(|source| Error::Deserialize {
        source: source.into(),
        data: buf.to_vec(),
    })?;
    Ok(response)
}

struct ClientInner {
    request_tx: Sender<TaskItem>,
}

impl ClientInner {
    /// This method will send a request to the watchman server
    /// and wait for its response.
    /// This is really an internal method, but it is made public in case a
    /// consumer of this crate needs to issue a command for which we haven't
    /// yet made an ergonomic wrapper.
    pub(crate) async fn generic_request<Request, Response>(
        &mut self,
        request: Request,
    ) -> Result<Response, Error>
    where
        Request: serde::Serialize + std::fmt::Debug,
        Response: serde::de::DeserializeOwned,
    {
        // Step 1: serialize into a bser byte buffer
        let mut request_data = vec![];
        serde_bser::ser::serialize(&mut request_data, &request).map_err(|source| {
            Error::Serialize {
                source: source.into(),
            }
        })?;

        // Step 2: ask the client task to send it for us
        let (tx, rx) = tokio::sync::oneshot::channel();
        self.request_tx
            .send(TaskItem::QueueRequest(SendRequest {
                buf: request_data,
                tx,
            }))
            .await
            .map_err(|_| ConnectionLost::ClientTaskExited)?;

        // Step 3: wait for the client task to give us the response
        let pdu_data = rx
            .await
            .map_err(|_| ConnectionLost::ClientTaskExited)?
            .map_err(ConnectionLost::Error)?;

        // Step 4: sniff for an error response in the deserialized data
        use serde::Deserialize;
        #[derive(Deserialize, Debug)]
        struct MaybeError {
            #[serde(default)]
            error: Option<String>,
        }

        // Step 5: deserialize into the caller-desired format
        let maybe_err: MaybeError = bunser(&pdu_data)?;
        if let Some(message) = maybe_err.error {
            return Err(Error::WatchmanServerError {
                message,
                command: format!("{:#?}", request),
            });
        }

        let response: Response = bunser(&pdu_data)?;
        Ok(response)
    }
}

/// Returned by [Subscription::next](struct.Subscription.html#method.next)
/// as events are observed by Watchman.
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone)]
pub enum SubscriptionData<F>
where
    F: serde::de::DeserializeOwned + std::fmt::Debug + Clone + QueryFieldList,
{
    /// The Subscription was canceled.
    /// This could be for a number of reasons that are not knowable
    /// to the client:
    /// * The user may have issued the `watch-del` command
    /// * The containing watch root may have been deleted or
    ///   un-mounted
    /// * The containing watch may no longer be accessible
    ///   to the watchman user/process
    /// * Some other error condition that renders the project
    ///   unwatchable may have occurred
    /// * The server may have been gracefully shutdown
    ///
    /// A Canceled subscription will deliver no further results.
    Canceled,

    /// Files matching your criteria have changed.
    /// The QueryResult contains the details.
    /// Pay attention to the
    /// [is_fresh_instance](pdu/struct.QueryResult.html#structfield.is_fresh_instance) field!
    FilesChanged(QueryResult<F>),

    /// Some other watchman client has broadcast that the watched
    /// project is entering a new named state.
    /// For example, `hg.update` may be generated by the FB
    /// internal source control system to indicate that the
    /// working copy is about to be updated to a new revision.
    /// The metadata field contains data specific to the named
    /// state.
    StateEnter {
        state_name: String,
        metadata: Option<Value>,
    },
    /// Some other watchman client has broadcast that the watched
    /// project is no longer in the named state.
    /// This event can also be generated if the watchman client
    /// that entered the state disconnects unexpectedly from
    /// the watchman server.
    /// The `metadata` field will be `None` in that situation.
    StateLeave {
        state_name: String,
        metadata: Option<Value>,
    },
}

/// A handle to a subscription initiated via `Client::subscribe`.
/// Repeatedly call `Subscription::next().await` to yield the next
/// set of subscription results.
/// Use the `cancel` method to gracefully halt this subscription
/// if you have a program that creates and destroys subscriptions
/// throughout its lifetime.
pub struct Subscription<F>
where
    F: serde::de::DeserializeOwned + std::fmt::Debug + Clone + QueryFieldList,
{
    name: String,
    inner: Arc<Mutex<ClientInner>>,
    root: ResolvedRoot,
    responses: UnboundedReceiver<SubscriptionNotification>,
    _phantom: PhantomData<F>,
}

impl<F> Subscription<F>
where
    F: serde::de::DeserializeOwned + std::fmt::Debug + Clone + QueryFieldList,
{
    /// Returns the assigned name for this subscription instance.
    pub fn name(&self) -> &str {
        &self.name
    }

    /// Yield the next set of subscription data.
    /// An error is generated if the subscription is disconnected
    /// from the server.
    #[allow(clippy::should_implement_trait)]
    pub async fn next(&mut self) -> Result<SubscriptionData<F>, Error> {
        let msg = self
            .responses
            .recv()
            .await
            .ok_or(ConnectionLost::ClientTaskExited)?;

        match msg {
            SubscriptionNotification::Pdu(pdu) => {
                let response: QueryResult<F> = bunser(&pdu)?;

                if let Some(state_name) = response.state_enter {
                    Ok(SubscriptionData::StateEnter {
                        state_name,
                        metadata: response.state_metadata,
                    })
                } else if let Some(state_name) = response.state_leave {
                    Ok(SubscriptionData::StateLeave {
                        state_name,
                        metadata: response.state_metadata,
                    })
                } else {
                    Ok(SubscriptionData::FilesChanged(response))
                }
            }
            SubscriptionNotification::Canceled => {
                self.responses.close();
                Ok(SubscriptionData::Canceled)
            }
        }
    }

    /// Gracefully cancel this subscription.
    /// If you are imminently about to drop the associated client then you
    /// need not call this method.
    /// However, if the associated client is going to live much longer
    /// than a Subscription that you are about to drop,
    /// then it is recommended that you call `cancel` so that the server
    /// will stop delivering data about it.
    pub async fn cancel(self) -> Result<(), Error> {
        let mut inner = self.inner.lock().await;
        let _: UnsubscribeResponse = inner
            .generic_request(Unsubscribe("unsubscribe", self.root.root, self.name))
            .await?;
        Ok(())
    }
}

impl Client {
    /// This method will send a request to the watchman server
    /// and wait for its response.
    /// This is really an internal method, but it is made public in case a
    /// consumer of this crate needs to issue a command for which we haven't
    /// yet made an ergonomic wrapper.
    #[doc(hidden)]
    pub async fn generic_request<Request, Response>(
        &self,
        request: Request,
    ) -> Result<Response, Error>
    where
        Request: serde::Serialize + std::fmt::Debug,
        Response: serde::de::DeserializeOwned,
    {
        let mut inner = self.inner.lock().await;
        let response: Response = inner.generic_request(request).await?;
        Ok(response)
    }

    /// This method will attempt to assert the state named `state_name`
    /// on the watchman server. This is used to facilitate advanced settling
    /// in subscriptions.
    ///
    /// Only one client can assert a given named state for a given root at
    /// a given time; an error will be returned if another client owns the
    /// requested state assertion.
    ///
    /// If successful, the state will remain asserted until the owning client
    /// either issues a `state-leave` or disconnects from the server.
    ///
    /// The optional `metadata` will be published to all subscribers of the
    /// root and made visible via `SubscriptionData::StateEnter::metadata`.
    ///
    /// See also: <https://facebook.github.io/watchman/docs/cmd/state-enter.html>
    pub async fn state_enter(
        &self,
        root: &ResolvedRoot,
        state_name: &str,
        sync_timeout: SyncTimeout,
        metadata: Option<Value>,
    ) -> Result<(), Error> {
        let request = StateEnterLeaveRequest(
            "state-enter",
            root.root.clone(),
            StateEnterLeaveParams {
                name: state_name,
                metadata,
                sync_timeout,
            },
        );

        let _response: StateEnterLeaveResponse = self.generic_request(request).await?;
        Ok(())
    }

    /// This method will attempt to release an owned state assertion for the
    /// state named `state_name` on the watchman server. This is used to facilitate
    /// advanced settling in subscriptions.
    ///
    /// The optional `metadata` will be published to all subscribers of the
    /// root and made visible via `SubscriptionData::StateLeave::metadata`.
    ///
    /// See also: <https://facebook.github.io/watchman/docs/cmd/state-leave.html>
    pub async fn state_leave(
        &self,
        root: &ResolvedRoot,
        state_name: &str,
        sync_timeout: SyncTimeout,
        metadata: Option<Value>,
    ) -> Result<(), Error> {
        let request = StateEnterLeaveRequest(
            "state-leave",
            root.root.clone(),
            StateEnterLeaveParams {
                name: state_name,
                metadata,
                sync_timeout,
            },
        );

        let _response: StateEnterLeaveResponse = self.generic_request(request).await?;
        Ok(())
    }

    /// This is typically the first method invoked on a client.
    /// Its purpose is to ensure that the watchman server is watching the specified
    /// path and to resolve it to a `ResolvedRoot` instance.
    ///
    /// The path to resolve must be a canonical path; watchman performs strict name
    /// resolution to detect TOCTOU issues and will generate an error if the path
    /// is not the canonical name.
    ///
    /// Note that for regular filesystem watches, if the requested path is not
    /// yet being watched, this method will not yield until the watchman server
    /// has completed a recursive crawl of that portion of the filesystem.
    /// In other words, the worst case performance of this is
    /// `O(recursive-number-of-files)` and is impacted by the underlying storage
    /// device and its performance characteristics.
    pub async fn resolve_root(&self, path: CanonicalPath) -> Result<ResolvedRoot, Error> {
        let response: WatchProjectResponse = self
            .generic_request(WatchProjectRequest("watch-project", path.0.clone()))
            .await?;

        Ok(ResolvedRoot {
            root: response.watch,
            relative: response.relative_path,
            watcher: response.watcher,
        })
    }

    /// Perform a generic watchman query.
    /// The `F` type is a struct defined by the
    /// [query_result_type!](macro.query_result_type.html) macro,
    /// or, if you want only the file name from the results, the
    /// [NameOnly](struct.NameOnly.html) struct.
    ///
    /// ```
    /// use watchman_client::prelude::*;
    /// use serde::Deserialize;
    ///
    /// query_result_type! {
    ///     struct NameAndType {
    ///         name: NameField,
    ///         file_type: FileTypeField,
    ///     }
    /// }
    ///
    /// async fn query(
    ///    client: &mut Client,
    ///    resolved: &ResolvedRoot
    /// ) -> Result<(), Box<dyn std::error::Error>> {
    ///    let response: QueryResult<NameAndType> = client
    ///        .query(
    ///            &resolved,
    ///               QueryRequestCommon {
    ///                glob: Some(vec!["**/*.rs".to_string()]),
    ///                ..Default::default()
    ///            },
    ///        )
    ///        .await?;
    ///    println!("response: {:#?}", response);
    ///    Ok(())
    /// }
    /// ```
    ///
    /// When constructing your result type, you can select from the
    /// following fields:
    ///
    /// * [CTimeAsFloatField](struct.CTimeAsFloatField.html)
    /// * [CTimeField](struct.CTimeField.html)
    /// * [ContentSha1HexField](struct.ContentSha1HexField.html)
    /// * [CreatedClockField](struct.CreatedClockField.html)
    /// * [DeviceNumberField](struct.DeviceNumberField.html)
    /// * [ExistsField](struct.ExistsField.html)
    /// * [FileTypeField](struct.FileTypeField.html)
    /// * [InodeNumberField](struct.InodeNumberField.html)
    /// * [MTimeAsFloatField](struct.MTimeAsFloatField.html)
    /// * [MTimeField](struct.MTimeField.html)
    /// * [ModeAndPermissionsField](struct.ModeAndPermissionsField.html)
    /// * [NameField](struct.NameField.html)
    /// * [NewField](struct.NewField.html)
    /// * [NumberOfLinksField](struct.NumberOfLinksField.html)
    /// * [ObservedClockField](struct.ObservedClockField.html)
    /// * [OwnerGidField](struct.OwnerGidField.html)
    /// * [OwnerUidField](struct.OwnerUidField.html)
    /// * [SizeField](struct.SizeField.html)
    /// * [SymlinkTargetField](struct.SymlinkTargetField.html)
    ///
    /// (See [the fields module](fields/index.html) for a definitive list)
    ///
    /// The file names are all relative to the `root` parameter.
    pub async fn query<F>(
        &self,
        root: &ResolvedRoot,
        query: QueryRequestCommon,
    ) -> Result<QueryResult<F>, Error>
    where
        F: serde::de::DeserializeOwned + std::fmt::Debug + Clone + QueryFieldList,
    {
        let query = QueryRequest(
            "query",
            root.root.clone(),
            QueryRequestCommon {
                relative_root: root.relative.clone(),
                fields: F::field_list(),
                ..query
            },
        );

        let response: QueryResult<F> = self.generic_request(query.clone()).await?;

        Ok(response)
    }

    /// Create a Subscription that will yield file changes as they occur in
    /// real time.
    /// The `F` type is a struct defined by the
    /// [query_result_type!](macro.query_result_type.html) macro,
    /// or, if you want only the file name from the results, the
    /// [NameOnly](struct.NameOnly.html) struct.
    ///
    /// Returns two pieces of information:
    /// * A [Subscription](struct.Subscription.html) handle that can be used to yield changes
    ///   as they are observed by watchman
    /// * A [SubscribeResponse](pdu/struct.SubscribeResponse.html) that contains some data about the
    ///   state of the watch at the time the subscription was
    ///   initiated
    pub async fn subscribe<F>(
        &self,
        root: &ResolvedRoot,
        query: SubscribeRequest,
    ) -> Result<(Subscription<F>, SubscribeResponse), Error>
    where
        F: serde::de::DeserializeOwned + std::fmt::Debug + Clone + QueryFieldList,
    {
        let name = format!(
            "sub-[{}]-{}",
            std::env::args()
                .next()
                .unwrap_or_else(|| "<no-argv-0>".to_string()),
            SUB_ID.fetch_add(1, Ordering::Relaxed)
        );

        let query = SubscribeCommand(
            "subscribe",
            root.root.clone(),
            name.clone(),
            SubscribeRequest {
                relative_root: root.relative.clone(),
                fields: F::field_list(),
                ..query
            },
        );

        let (tx, responses) = tokio::sync::mpsc::unbounded_channel();

        {
            let inner = self.inner.lock().await;
            inner
                .request_tx
                .send(TaskItem::RegisterSubscription(name.clone(), tx))
                .await
                .map_err(|_| ConnectionLost::ClientTaskExited)?;
        }

        let subscription = Subscription::<F> {
            name,
            inner: Arc::clone(&self.inner),
            root: root.clone(),
            responses,
            _phantom: PhantomData,
        };

        let response: SubscribeResponse = self.generic_request(query).await?;

        Ok((subscription, response))
    }

    /// Expand a set of globs into the set of matching file names.
    /// The globs must be relative to the `root` parameter.
    /// The returned file names are all relative to the `root` parameter.
    pub async fn glob(&self, root: &ResolvedRoot, globs: &[&str]) -> Result<Vec<PathBuf>, Error> {
        let response: QueryResult<NameOnly> = self
            .query(
                root,
                QueryRequestCommon {
                    relative_root: root.relative.clone(),
                    glob: Some(globs.iter().map(|&s| s.to_string()).collect()),
                    ..Default::default()
                },
            )
            .await?;
        Ok(response
            .files
            .unwrap_or_else(Vec::new)
            .into_iter()
            .map(|f| f.name.into_inner())
            .collect())
    }

    /// Returns the current clock value for a watched root.
    /// If `sync_timeout` is `SyncTimeout::DisableCookie` then the instantaneous
    /// clock value is returned without using a sync cookie.
    ///
    /// Otherwise, a sync cookie will be created and the server will wait
    /// for up to the associated `sync_timeout` duration to observe it.
    /// If that timeout is reached, this method will yield an error.
    ///
    /// When should you use a cookie?  If you need to a clock value that is
    /// guaranteed to reflect any filesystem changes that happened before
    /// a given point in time you should use a sync cookie.
    ///
    /// ## See also:
    ///  * <https://facebook.github.io/watchman/docs/cmd/clock.html>
    ///  * <https://facebook.github.io/watchman/docs/cookies.html>
    pub async fn clock(
        &self,
        root: &ResolvedRoot,
        sync_timeout: SyncTimeout,
    ) -> Result<ClockSpec, Error> {
        let response: ClockResponse = self
            .generic_request(ClockRequest(
                "clock",
                root.root.clone(),
                ClockRequestParams { sync_timeout },
            ))
            .await?;
        Ok(response.clock)
    }

    /// Returns the current configuration for a watched root.
    pub async fn get_config(&self, root: &ResolvedRoot) -> Result<WatchmanConfig, Error> {
        let response: GetConfigResponse = self
            .generic_request(GetConfigRequest("get-config", root.root.clone()))
            .await?;
        Ok(response.config)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    use futures::stream::{self, TryStreamExt};
    use serde::{Deserialize, Serialize};
    use std::io;
    use tokio_util::io::StreamReader;

    #[derive(Serialize, Deserialize, PartialEq, Debug)]
    struct TestStruct {
        value: i32,
    }

    #[test]
    fn connection_builder_paths() {
        let builder = Connector::new().unix_domain_socket("/some/path");
        assert_eq!(builder.unix_domain, Some(PathBuf::from("/some/path")));
    }

    #[tokio::test]
    async fn test_decoder() {
        async fn read_bser(buf: &[u8], chunk_size: usize) -> Vec<TestStruct> {
            let chunks = buf
                .chunks(chunk_size)
                .map(|c| Result::<_, io::Error>::Ok(Bytes::copy_from_slice(c)));

            let reader = StreamReader::new(stream::iter(chunks));

            let decoded = FramedRead::new(reader, BserSplitter)
                .map_err(TaskError::from)
                .and_then(|bytes| async move {
                    // We unwrap this since a) this is a test and b) serde_bser's errors aren't
                    // easily propagated into en error type like anyhow::Error without losing the
                    // message.
                    Ok(serde_bser::from_slice::<TestStruct>(&bytes).unwrap())
                })
                .try_collect()
                .await
                .unwrap();

            decoded
        }

        let msgs = vec![
            TestStruct { value: 1 },
            TestStruct { value: 2 },
            TestStruct { value: 3 },
        ];

        let mut buf = vec![];

        for msg in msgs.iter() {
            serde_bser::ser::serialize(&mut buf, msg).expect("Failed to write to a Vec");
        }

        // Read it with various sizes
        assert_eq!(msgs, read_bser(&buf, 1).await);
        assert_eq!(msgs, read_bser(&buf, 2).await);
        assert_eq!(msgs, read_bser(&buf, 10).await);
        assert_eq!(msgs, read_bser(&buf, buf.len()).await);
    }

    #[test]
    fn test_decoder_err() {
        let mut bytes = BytesMut::new();

        // We don't error if there isn't much data yet
        bytes.extend_from_slice(&[0; 10]);
        let r1 = BserSplitter.decode(&mut bytes);
        assert!(r1.is_ok());
        assert!(r1.unwrap().is_none());

        // We do if there is enough
        bytes.extend_from_slice(&[0; 10]);
        let r1 = BserSplitter.decode(&mut bytes);
        assert!(r1.is_err());
    }

    #[test]
    fn test_bounds() {
        fn assert_bounds<T: std::error::Error + Sync + Send + 'static>() {}
        assert_bounds::<Error>();
        assert_bounds::<TaskError>();
    }
}