from __future__ import annotations
import json
from typing import List
from briefcase.events.types import BriefcaseEvent
class KafkaPublisher:
def __init__(self, brokers: List[str], topic: str) -> None:
self._brokers = brokers
self._topic = topic
self._producer = None
def _get_producer(self):
if self._producer is None:
try:
from confluent_kafka import Producer
except ImportError as exc:
raise ImportError(
"confluent-kafka is required for KafkaPublisher. "
"Install it with: pip install 'briefcase-ai[kafka]'"
) from exc
self._producer = Producer(
{"bootstrap.servers": ",".join(self._brokers)}
)
return self._producer
@staticmethod
def _serialize(event: BriefcaseEvent) -> bytes:
return json.dumps(
{
"event_type": event.event_type,
"decision_id": event.decision_id,
"timestamp": event.timestamp.isoformat(),
"idempotency_key": event.idempotency_key,
"payload": event.payload,
},
default=str,
).encode()
def publish(self, event: BriefcaseEvent) -> bool:
try:
producer = self._get_producer()
value = self._serialize(event)
producer.produce(
topic=self._topic,
key=event.idempotency_key.encode(),
value=value,
)
producer.poll(0) return True
except ImportError:
raise
except Exception:
return False