use crate::data::{
ActionEnum, ConfigurationReg, Frame, LuoshuDataEnum, LuoshuDataHandle, LuoshuSyncDataEnum,
Subscribe,
};
use anyhow::Result;
use async_trait::async_trait;
use luoshu_configuration::ConfiguratorStore;
use luoshu_core::Store;
use luoshu_namespace::NamespaceStore;
use luoshu_registry::RegistryStore;
use luoshu_sled_storage::LuoshuSledStorage;
use std::{collections::HashMap, net::SocketAddr};
use tokio::sync::mpsc::UnboundedSender;
use super::ServiceReg;
pub struct LuoshuSledData {
pub configuration_store: ConfiguratorStore<LuoshuSledStorage>,
pub namespace_store: NamespaceStore<LuoshuSledStorage>,
pub service_store: RegistryStore<LuoshuSledStorage>,
config_subscribers: HashMap<String, Vec<UnboundedSender<Frame>>>,
service_book: HashMap<SocketAddr, ServiceReg>,
sender_list: Vec<UnboundedSender<Frame>>,
}
impl LuoshuSledData {
pub fn new() -> Self {
let storage: LuoshuSledStorage = LuoshuSledStorage::default();
let configuration_store = ConfiguratorStore::new(storage.clone());
let namespace_store = NamespaceStore::new(storage.clone());
let service_store = RegistryStore::new(storage);
LuoshuSledData {
configuration_store,
namespace_store,
service_store,
config_subscribers: HashMap::new(),
service_book: HashMap::new(),
sender_list: vec![],
}
}
fn send_sync_message(&mut self) -> Result<()> {
self.sender_list.retain(|sender| {
sender
.send(ActionEnum::Sync(self.service_store.get_values().into()).into())
.is_ok()
});
Ok(())
}
}
impl Default for LuoshuSledData {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl LuoshuDataHandle for LuoshuSledData {
async fn append(
&mut self,
value: &LuoshuDataEnum,
client: Option<SocketAddr>,
sender: Option<&UnboundedSender<Frame>>,
) -> Result<()> {
match value {
LuoshuDataEnum::Namespace(value) => {
self.namespace_store.append(value.into())?;
self.namespace_store.save()?;
}
LuoshuDataEnum::Configuration(value) => {
let subscriber: Subscribe = value.into();
match self
.config_subscribers
.get_mut(subscriber.to_string().as_str())
{
None => {}
Some(subscribers) => {
let mut pre_delete_list = vec![];
for (index, subscriber) in subscribers.iter().enumerate() {
match subscriber.send(
ActionEnum::Sync(LuoshuSyncDataEnum::LuoshuData(
value.clone().into(),
))
.into(),
) {
Ok(_) => {}
Err(_) => {
pre_delete_list.push(index);
}
};
}
for delete_index in pre_delete_list {
subscribers.remove(delete_index);
}
}
}
self.configuration_store.append(value.into())?;
self.configuration_store.save()?;
}
LuoshuDataEnum::Service(value) => {
if let Some(client) = client {
self.service_book.insert(client, value.clone());
}
if let Some(sender) = sender {
self.sender_list.push(sender.clone());
}
println!("Service:{:#?}", value);
self.service_store.append(value.into())?;
self.send_sync_message()?;
}
_ => {}
};
Ok(())
}
async fn remove(&mut self, value: &LuoshuDataEnum) -> Result<()> {
match value {
LuoshuDataEnum::Namespace(value) => self.namespace_store.remove(value.into())?,
LuoshuDataEnum::Configuration(value) => {
self.configuration_store.remove(value.into())?
}
LuoshuDataEnum::Service(value) => self.service_store.remove(value.into())?,
_ => {}
};
Ok(())
}
async fn sync(&mut self, value: &LuoshuSyncDataEnum) -> Result<()> {
let _ = value;
Ok(())
}
async fn subscribe(
&mut self,
subscribe: Subscribe,
subscriber_sender: &UnboundedSender<Frame>,
) -> Result<()> {
match self.config_subscribers.get_mut(&subscribe.to_string()) {
None => {
self.config_subscribers
.insert(subscribe.to_string(), vec![subscriber_sender.clone()]);
}
Some(subscribers) => {
subscribers.push(subscriber_sender.clone());
}
};
if let Some(mut configurator) = self
.configuration_store
.get_configurations_by_namespace(subscribe.namespace.clone())
{
if let Some(config) = configurator.get_configuration(subscribe.name.clone()) {
let config_reg = ConfigurationReg::new(subscribe.namespace, subscribe.name, config);
subscriber_sender.send(
ActionEnum::Sync(LuoshuSyncDataEnum::LuoshuData(config_reg.into())).into(),
)?;
}
};
Ok(())
}
async fn broken(&mut self, client: SocketAddr) -> Result<()> {
tracing::info!("连接断开: {}", client);
if let Some(service) = self.service_book.remove(&client) {
self.service_store.remove((&service).into())?;
self.send_sync_message()?;
};
Ok(())
}
}