using System;
using System.Runtime.InteropServices;
using ZeroDDS.Core;
using ZeroDDS.Domain;
using ZeroDDS.Topic;
namespace ZeroDDS.Pub;
public sealed class Publisher : IDisposable
{
private IntPtr _handle;
private readonly IntPtr _participant;
private bool _disposed;
public Publisher(DomainParticipant dp)
{
_participant = dp.Handle;
_handle = Native.DpCreatePublisher(_participant, IntPtr.Zero);
if (_handle == IntPtr.Zero) throw new DdsError("Publisher::create failed");
}
public Publisher(DomainParticipant dp, ZeroDDS.Qos.PublisherQos qos)
{
_participant = dp.Handle;
using var scope = new ZeroDDS.QosBridge.NativeQosScope();
var native = ZeroDDS.QosBridge.QosBridge.ToNative(qos, scope);
unsafe { _handle = Native.DpCreatePublisher(_participant, (IntPtr)(&native)); }
if (_handle == IntPtr.Zero) throw new DdsError("Publisher::create with QoS failed");
}
public IntPtr Handle => _handle;
public void Suspend() =>
StatusCheck.Check(Native.PubSuspendPublications(_handle), "Publisher::Suspend");
public void Resume() =>
StatusCheck.Check(Native.PubResumePublications(_handle), "Publisher::Resume");
public void WaitForAcks(Duration timeout) =>
StatusCheck.Check(Native.PubWaitForAcks(_handle, timeout.TotalMilliseconds),
"Publisher::WaitForAcks");
public void Dispose()
{
if (_disposed) return;
_disposed = true;
if (_handle != IntPtr.Zero)
{
Native.DpDeletePublisher(_participant, _handle);
_handle = IntPtr.Zero;
}
GC.SuppressFinalize(this);
}
~Publisher() { Dispose(); }
}
public sealed class DataWriter<T> : IDisposable
{
private IntPtr _handle;
private readonly IntPtr _publisher;
private readonly ITopicTraits<T> _traits;
private bool _disposed;
public DataWriter(Publisher pub, Topic<T> topic)
{
_publisher = pub.Handle;
_handle = Native.PubCreateDatawriter(_publisher, topic.Handle, IntPtr.Zero);
if (_handle == IntPtr.Zero) throw new DdsError("DataWriter::create failed");
_traits = topic.Traits;
}
public DataWriter(Publisher pub, Topic<T> topic, ZeroDDS.Qos.DataWriterQos qos)
{
_publisher = pub.Handle;
using var scope = new ZeroDDS.QosBridge.NativeQosScope();
var native = ZeroDDS.QosBridge.QosBridge.ToNative(qos, scope);
unsafe { _handle = Native.PubCreateDatawriter(_publisher, topic.Handle, (IntPtr)(&native)); }
if (_handle == IntPtr.Zero) throw new DdsError("DataWriter::create with QoS failed");
_traits = topic.Traits;
}
public IntPtr Handle => _handle;
public void Write(T sample)
{
var bytes = _traits.Encode(sample);
unsafe
{
fixed (byte* p = bytes)
{
int rc = Native.DwWrite(_handle, (IntPtr)p, (UIntPtr)bytes.Length, 0);
StatusCheck.Check(rc, "DataWriter::Write");
}
}
}
public void WaitForAcks(Duration timeout) =>
StatusCheck.Check(Native.DwWaitForAcks(_handle, timeout.TotalMilliseconds),
"DataWriter::WaitForAcks");
public void WaitForMatched(int min, Duration timeout) =>
StatusCheck.Check(Native.DwWaitForMatched(_handle, min, timeout.TotalMilliseconds),
"DataWriter::WaitForMatched");
public void AssertLiveliness() =>
StatusCheck.Check(Native.DwAssertLiveliness(_handle), "DataWriter::AssertLiveliness");
public ZeroDDS.Status.PublicationMatchedStatus GetPublicationMatchedStatus()
{
StatusCheck.Check(Native.DwGetPublicationMatchedStatus(_handle, out var s),
"DataWriter::GetPublicationMatchedStatus");
return new ZeroDDS.Status.PublicationMatchedStatus(
s.TotalCount, s.TotalCountChange, s.CurrentCount, s.CurrentCountChange,
new InstanceHandle(s.LastSubscriptionHandle));
}
public ZeroDDS.Status.LivelinessLostStatus GetLivelinessLostStatus()
{
StatusCheck.Check(Native.DwGetLivelinessLostStatus(_handle, out var s),
"DataWriter::GetLivelinessLostStatus");
return new ZeroDDS.Status.LivelinessLostStatus(s.TotalCount, s.TotalCountChange);
}
public ZeroDDS.Status.OfferedDeadlineMissedStatus GetOfferedDeadlineMissedStatus()
{
StatusCheck.Check(Native.DwGetOfferedDeadlineMissedStatus(_handle, out var s),
"DataWriter::GetOfferedDeadlineMissedStatus");
return new ZeroDDS.Status.OfferedDeadlineMissedStatus(
s.TotalCount, s.TotalCountChange, new InstanceHandle(s.LastInstanceHandle));
}
public ZeroDDS.Status.OfferedIncompatibleQosStatus GetOfferedIncompatibleQosStatus()
{
StatusCheck.Check(Native.DwGetOfferedIncompatibleQosStatus(_handle, out var s),
"DataWriter::GetOfferedIncompatibleQosStatus");
return new ZeroDDS.Status.OfferedIncompatibleQosStatus(
s.TotalCount, s.TotalCountChange, s.LastPolicyId);
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
if (_handle != IntPtr.Zero)
{
Native.PubDeleteDatawriter(_publisher, _handle);
_handle = IntPtr.Zero;
}
GC.SuppressFinalize(this);
}
~DataWriter() { Dispose(); }
}