use crate::contacts_service::{
error::ContactsServiceError,
handle::{ContactsServiceRequest, ContactsServiceResponse},
storage::database::{ContactsBackend, ContactsDatabase},
};
use futures::{pin_mut, StreamExt};
use log::*;
use tari_service_framework::reply_channel;
use tari_shutdown::ShutdownSignal;
const LOG_TARGET: &str = "wallet:contacts_service";
pub struct ContactsService<T>
where T: ContactsBackend + 'static
{
db: ContactsDatabase<T>,
request_stream:
Option<reply_channel::Receiver<ContactsServiceRequest, Result<ContactsServiceResponse, ContactsServiceError>>>,
shutdown_signal: Option<ShutdownSignal>,
}
impl<T> ContactsService<T>
where T: ContactsBackend + 'static
{
pub fn new(
request_stream: reply_channel::Receiver<
ContactsServiceRequest,
Result<ContactsServiceResponse, ContactsServiceError>,
>,
db: ContactsDatabase<T>,
shutdown_signal: ShutdownSignal,
) -> Self
{
Self {
db,
request_stream: Some(request_stream),
shutdown_signal: Some(shutdown_signal),
}
}
pub async fn start(mut self) -> Result<(), ContactsServiceError> {
let request_stream = self
.request_stream
.take()
.expect("Contacts Service initialized without request_stream")
.fuse();
pin_mut!(request_stream);
let shutdown = self
.shutdown_signal
.take()
.expect("Output Manager Service initialized without shutdown signal");
pin_mut!(shutdown);
info!(target: LOG_TARGET, "Contacts Service started");
loop {
futures::select! {
request_context = request_stream.select_next_some() => {
let (request, reply_tx) = request_context.split();
let response = self.handle_request(request).await.map_err(|e| {
error!(target: LOG_TARGET, "Error handling request: {:?}", e);
e
});
let _ = reply_tx.send(response).map_err(|e| {
error!(target: LOG_TARGET, "Failed to send reply");
e
});
},
_ = shutdown => {
info!(target: LOG_TARGET, "Contacts service shutting down because it received the shutdown signal");
break;
}
complete => {
info!(target: LOG_TARGET, "Contacts service shutting down");
break;
}
}
}
info!(target: LOG_TARGET, "Contacts Service ended");
Ok(())
}
async fn handle_request(
&mut self,
request: ContactsServiceRequest,
) -> Result<ContactsServiceResponse, ContactsServiceError>
{
match request {
ContactsServiceRequest::GetContact(pk) => {
Ok(self.db.get_contact(pk).await.map(ContactsServiceResponse::Contact)?)
},
ContactsServiceRequest::UpsertContact(c) => {
self.db.upsert_contact(c.clone()).await?;
info!(
target: LOG_TARGET,
"Contact Saved: \nAlias: {}\nPubKey: {} ", c.alias, c.public_key
);
Ok(ContactsServiceResponse::ContactSaved)
},
ContactsServiceRequest::RemoveContact(pk) => {
let result = self.db.remove_contact(pk).await?;
info!(
target: LOG_TARGET,
"Contact Removed: \nAlias: {}\nPubKey: {} ", result.alias, result.public_key
);
Ok(ContactsServiceResponse::ContactRemoved(result))
},
ContactsServiceRequest::GetContacts => {
Ok(self.db.get_contacts().await.map(ContactsServiceResponse::Contacts)?)
},
}
}
}