graphgate-handler 0.5.1

GraphGate is Apollo Federation implemented in Rust
Documentation
use std::sync::atomic::{AtomicU64, Ordering};

use anyhow::Result;
use graphgate_planner::{Request, Response};
use http::HeaderMap;
use tokio::sync::mpsc;

use crate::websocket::WebSocketController;
use crate::ServiceRouteTable;

#[async_trait::async_trait]
pub trait Fetcher: Send + Sync {
    async fn query(&self, service: &str, request: Request) -> Result<Response>;
}

pub struct HttpFetcher<'a> {
    router_table: &'a ServiceRouteTable,
    header_map: &'a HeaderMap,
}

impl<'a> HttpFetcher<'a> {
    pub fn new(router_table: &'a ServiceRouteTable, header_map: &'a HeaderMap) -> Self {
        Self {
            router_table,
            header_map,
        }
    }
}

#[async_trait::async_trait]
impl<'a> Fetcher for HttpFetcher<'a> {
    async fn query(&self, service: &str, request: Request) -> Result<Response> {
        self.router_table
            .query(service, request, Some(self.header_map))
            .await
    }
}

pub struct WebSocketFetcher {
    controller: WebSocketController,
    id: AtomicU64,
}

impl WebSocketFetcher {
    pub fn new(controller: WebSocketController) -> Self {
        Self {
            controller,
            id: Default::default(),
        }
    }
}

#[async_trait::async_trait]
impl Fetcher for WebSocketFetcher {
    async fn query(&self, service: &str, request: Request) -> Result<Response> {
        let id = self.id.fetch_add(1, Ordering::Relaxed);
        let (tx, mut rx) = mpsc::unbounded_channel();
        self.controller
            .subscribe(format!("__req{}", id), service, request, tx)
            .await?;
        rx.recv()
            .await
            .ok_or_else(|| anyhow::anyhow!("Connection closed."))
    }
}