summa 0.8.8

Fast full-text search server
use crate::configs::{ApplicationConfig, ApplicationConfigBuilder, ApplicationConfigHolder, GrpcConfigBuilder, MetricsConfigBuilder};
use crate::errors::SummaResult;
use crate::logging;
use crate::metrics::register_meter;
use crate::servers::{GrpcServer, MetricsServer};
use crate::services::IndexService;
use crate::utils::signal_channel::signal_channel;
use crate::utils::thread_handler::ControlMessage;
use async_broadcast::Receiver;
use clap::{arg, command};
use futures::try_join;
use std::future::Future;
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::runtime;

pub struct Application {
    config: ApplicationConfigHolder,
}

const LONG_ABOUT: &str = "
Fast full-text search server.

Documentation: https://izihawa.github.io/summa
";

impl Application {
    pub fn from_config(config: ApplicationConfig) -> Application {
        Application {
            config: ApplicationConfigHolder::from_config(config),
        }
    }

    pub fn from_application_config_holder(config: ApplicationConfigHolder) -> Application {
        Application { config }
    }

    pub fn proceed_args() -> SummaResult<()> {
        let matches = command!()
            .name("summa")
            .override_usage("summa-server [OPTIONS] <SUBCOMMAND>")
            .subcommand_required(true)
            .arg_required_else_help(true)
            .about(LONG_ABOUT)
            .version(option_env!("CARGO_PKG_VERSION").unwrap_or("master"))
            .author("Pasha Podolsky")
            .arg(arg!(-v --verbose ... "Level of verbosity"))
            .subcommand(
                command!("generate-config")
                    .about("Generate default config file")
                    .arg(
                        arg!(-d <DATA_PATH> "Path for storing configs and data")
                            .default_value("data")
                            .required(false)
                            .takes_value(true),
                    )
                    .arg(
                        arg!(-g <GRPC_ENDPOINT> "GRPC listen endpoint")
                            .default_value("127.0.0.1:8082")
                            .required(false)
                            .takes_value(true),
                    )
                    .arg(
                        arg!(-m <METRICS_ENDPOINT> "Metrics listen endpoint")
                            .default_value("127.0.0.1:8084")
                            .required(false)
                            .takes_value(true),
                    ),
            )
            .subcommand(
                command!("serve")
                    .about("Launch search server")
                    .arg(arg!(<CONFIG> "Search engine config file").required(true).takes_value(true)),
            )
            .get_matches();

        match matches.subcommand() {
            Some(("generate-config", submatches)) => {
                let data_path = Path::new(submatches.value_of("DATA_PATH").unwrap());
                let grpc_endpoint = submatches.value_of("GRPC_ENDPOINT").unwrap();
                let metrics_endpoint = submatches.value_of("METRICS_ENDPOINT").unwrap();
                let default_config = ApplicationConfigBuilder::default()
                    .data_path(data_path.join("bin"))
                    .logs_path(data_path.join("logs"))
                    .grpc(GrpcConfigBuilder::default().endpoint(grpc_endpoint.to_owned()).build().unwrap())
                    .metrics(MetricsConfigBuilder::default().endpoint(metrics_endpoint.to_owned()).build().unwrap())
                    .build()
                    .unwrap();
                println!("{}", serde_yaml::to_string(&default_config).unwrap());
                Ok(())
            }
            Some(("serve", submatches)) => {
                let config_path = submatches.value_of("CONFIG").map(Path::new).unwrap();
                let application_config_holder = ApplicationConfigHolder::from_path(config_path)?;
                let _guards = {
                    let application_config = application_config_holder.read();
                    if application_config.debug {
                        logging::default()
                    } else {
                        logging::file(&application_config.log_path)
                    }
                };
                let app = Application::from_application_config_holder(application_config_holder);
                app.run()
            }
            _ => unreachable!("Exhausted list of subcommands and subcommand_required prevents `None`"),
        }
    }

    pub fn create_runtime(&self) -> SummaResult<runtime::Runtime> {
        Ok(runtime::Builder::new_multi_thread()
            .enable_all()
            .thread_name_fn(|| {
                static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
                let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
                format!("tokio-runtime-workers-{}", id)
            })
            .build()?)
    }

    pub fn serve(&self, terminator: &Receiver<ControlMessage>) -> SummaResult<impl Future<Output = SummaResult<()>>> {
        let metrics_server_future = MetricsServer::new(&self.config)?.start(terminator.clone())?;

        let index_service = IndexService::new(&self.config);
        let grpc_server_future = GrpcServer::new(&self.config, &index_service)?.start(terminator.clone())?;

        Ok(async move {
            index_service.setup_index_holders().await?;
            register_meter(&index_service);
            try_join!(metrics_server_future, grpc_server_future)?;
            Ok(())
        })
    }

    fn run(&self) -> SummaResult<()> {
        let runtime = self.create_runtime()?;
        runtime.block_on(async move {
            let receiver = signal_channel();
            let server = self.serve(&receiver)?;
            server.await
        })
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::configs::application_config::tests::create_test_application_config;
    use crate::proto;
    use crate::proto::index_api_client::IndexApiClient;
    use crate::search_engine::index_holder::tests::create_test_fields;
    use crate::utils::thread_handler::{ControlMessage, ThreadHandler};
    use async_broadcast::broadcast;
    use std::default::Default;
    use tonic::transport::Channel;

    async fn create_index_api_client(endpoint: &str) -> IndexApiClient<Channel> {
        IndexApiClient::connect(endpoint.to_owned()).await.unwrap()
    }

    async fn create_client_server(root_path: &Path) -> SummaResult<(ThreadHandler, IndexApiClient<Channel>)> {
        let config_holder = ApplicationConfigHolder::from_path_or(root_path.join("summa.yaml"), || create_test_application_config(&root_path.join("data")))?;
        let grpc_endpoint = config_holder.read().grpc.endpoint.clone();
        let (server_terminator, receiver) = broadcast::<ControlMessage>(1);
        let thread_handler = ThreadHandler::new(
            tokio::spawn(Application::from_application_config_holder(config_holder).serve(&receiver)?),
            server_terminator,
        );
        let client = create_index_api_client(&format!("http://{}", &grpc_endpoint)).await;
        Ok((thread_handler, client))
    }

    async fn create_index(
        index_api_client: &mut IndexApiClient<Channel>,
        index_name: &str,
        fields: &str,
    ) -> Result<tonic::Response<proto::CreateIndexResponse>, tonic::Status> {
        index_api_client
            .create_index(tonic::Request::new(proto::CreateIndexRequest {
                index_name: index_name.to_owned(),
                index_engine: proto::IndexEngine::File.into(),
                fields: fields.to_owned(),
                ..Default::default()
            }))
            .await
    }

    async fn create_default_index(index_api_client: &mut IndexApiClient<Channel>) -> Result<tonic::Response<proto::CreateIndexResponse>, tonic::Status> {
        let fields = create_test_fields();
        let fields_str = serde_yaml::to_string(&fields).unwrap();
        create_index(index_api_client, "test_index", &fields_str).await
    }

    #[tokio::test]
    async fn test_application() -> SummaResult<()> {
        logging::tests::initialize_default_once();
        let root_path = tempdir::TempDir::new("summa_test").unwrap();
        let (thread_handler, mut index_api_client) = create_client_server(root_path.path()).await?;

        let fields = create_test_fields();
        let fields_str = serde_yaml::to_string(&fields).unwrap();

        let response = create_index(&mut index_api_client, "test_index", &fields_str).await.unwrap();
        assert_eq!(
            response.into_inner(),
            proto::CreateIndexResponse {
                index: Some(proto::Index {
                    index_name: "test_index".to_owned(),
                    index_engine: "File".to_owned(),
                    ..Default::default()
                }),
            }
        );
        thread_handler.stop().await?;
        Ok(())
    }

    #[tokio::test]
    async fn test_persistence() -> SummaResult<()> {
        logging::tests::initialize_default_once();
        let root_path = tempdir::TempDir::new("summa_test").unwrap();

        let (thread_handler_1, mut index_api_client_1) = create_client_server(root_path.path()).await?;
        assert!(create_default_index(&mut index_api_client_1).await.is_ok());
        thread_handler_1.stop().await?;

        let (thread_handler_2, mut index_api_client_2) = create_client_server(root_path.path()).await?;
        assert_eq!(
            index_api_client_2
                .get_indices(tonic::Request::new(proto::GetIndicesRequest {}))
                .await
                .unwrap()
                .into_inner(),
            proto::GetIndicesResponse {
                indices: vec![proto::Index {
                    index_name: "test_index".to_owned(),
                    index_engine: "File".to_owned(),
                    ..Default::default()
                }]
            }
        );
        thread_handler_2.stop().await?;

        Ok(())
    }
}