imbibe 0.0.1

a cosmos chain indexer
Documentation
use core::future;

use futures::StreamExt;
use imbibe_persistence::pool::DbPool;
use imbibe_querier::{
	server::Querier,
	tarpc::{Query, server::QueryServer},
};
use tarpc::{
	server::{BaseChannel, Channel, incoming::Incoming},
	tokio_serde::formats::Json,
};
use tokio::net::ToSocketAddrs;

pub async fn run<A>(pool: DbPool, sock_addr: A) -> anyhow::Result<()>
where
	A: ToSocketAddrs,
{
	let listener = tarpc::serde_transport::tcp::listen(sock_addr, Json::default).await?;

	tracing::info!(
		"querier tarpc listening port {}",
		listener.local_addr().port()
	);

	let querier = Querier::builder().pool(pool).build();
	let rpc = listener
		.filter_map(|r| future::ready(r.ok()))
		.map(BaseChannel::with_defaults)
		.max_channels_per_key(1, |t| t.transport().peer_addr().unwrap().ip())
		.map(move |channel| {
			let server = QueryServer::builder().querier(querier.clone()).build();
			channel.execute(server.serve()).for_each(async |r| {
				tokio::spawn(r);
			})
		})
		.buffer_unordered(10)
		.for_each(async |_| {});

	tokio::spawn(rpc).await?;

	Ok(())
}